gateway/modules/methods/methodWeb.py
2025-06-10 18:19:33 +02:00

469 lines
No EOL
20 KiB
Python

from typing import Dict, Any, Optional
import logging
import aiohttp
import asyncio
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import re
from datetime import datetime, UTC
import requests
import time
import json
from modules.methods.methodBase import MethodBase, AuthSource, MethodResult
from modules.shared.configuration import APP_CONFIG
logger = logging.getLogger(__name__)
class MethodWeb(MethodBase):
"""Web method implementation for web operations"""
def __init__(self):
super().__init__()
self.name = "web"
self.description = "Handle web operations like search, crawl, and content extraction"
self.auth_source = AuthSource.LOCAL # Web operations typically don't need auth
# Web crawling configuration from agentWebcrawler
self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "")
self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google")
self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto")
self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5"))
self.timeout = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_TIMEOUT", "30"))
self.userAgent = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_USER_AGENT", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
if not self.srcApikey:
logger.error("SerpAPI key not configured")
@property
def actions(self) -> Dict[str, Dict[str, Any]]:
"""Available actions and their parameters"""
return {
"search": {
"description": "Search web content",
"retryMax": 3,
"timeout": 30,
"parameters": {
"query": {"type": "string", "required": True},
"maxResults": {"type": "number", "required": False},
"filters": {"type": "object", "required": False},
"searchEngine": {"type": "string", "required": False}
}
},
"crawl": {
"description": "Crawl web pages",
"retryMax": 2,
"timeout": 60,
"parameters": {
"url": {"type": "string", "required": True},
"depth": {"type": "number", "required": False},
"followLinks": {"type": "boolean", "required": False},
"includeImages": {"type": "boolean", "required": False},
"respectRobots": {"type": "boolean", "required": False}
}
},
"extract": {
"description": "Extract content from web page",
"retryMax": 2,
"timeout": 30,
"parameters": {
"url": {"type": "string", "required": True},
"selectors": {"type": "array", "items": "string", "required": False},
"format": {"type": "string", "required": False},
"includeMetadata": {"type": "boolean", "required": False}
}
}
}
async def execute(self, action: str, parameters: Dict[str, Any], authData: Optional[Dict[str, Any]] = None) -> MethodResult:
"""Execute web method"""
try:
# Validate parameters
if not await self.validateParameters(action, parameters):
return self._createResult(
success=False,
data={"error": f"Invalid parameters for {action}"}
)
# Execute action
if action == "fetchUrl":
return await self._fetchUrl(parameters)
elif action == "parseContent":
return await self._parseContent(parameters)
elif action == "extractData":
return await self._extractData(parameters)
else:
return self._createResult(
success=False,
data={"error": f"Unknown action: {action}"}
)
except Exception as e:
logger.error(f"Error executing web {action}: {e}")
return self._createResult(
success=False,
data={"error": str(e)}
)
async def _fetchUrl(self, parameters: Dict[str, Any]) -> MethodResult:
"""Fetch content from URL"""
try:
url = parameters["url"]
method = parameters.get("method", "GET")
headers = parameters.get("headers", {})
data = parameters.get("data")
timeout = parameters.get("timeout", 30)
async with aiohttp.ClientSession() as session:
async with session.request(
method=method,
url=url,
headers=headers,
data=data,
timeout=timeout
) as response:
content = await response.text()
return self._createResult(
success=True,
data={
"url": url,
"status": response.status,
"headers": dict(response.headers),
"content": content
}
)
except Exception as e:
logger.error(f"Error fetching URL: {e}")
return self._createResult(
success=False,
data={"error": f"Fetch failed: {str(e)}"}
)
async def _parseContent(self, parameters: Dict[str, Any]) -> MethodResult:
"""Parse web content"""
try:
content = parameters["content"]
contentType = parameters.get("contentType", "html")
if contentType == "html":
soup = BeautifulSoup(content, "html.parser")
return self._createResult(
success=True,
data={
"type": "html",
"title": soup.title.string if soup.title else None,
"text": soup.get_text(),
"links": [a.get("href") for a in soup.find_all("a", href=True)],
"images": [img.get("src") for img in soup.find_all("img", src=True)]
}
)
elif contentType == "json":
data = json.loads(content)
return self._createResult(
success=True,
data={
"type": "json",
"data": data
}
)
else:
raise ValueError(f"Unsupported content type: {contentType}")
except Exception as e:
logger.error(f"Error parsing content: {e}")
return self._createResult(
success=False,
data={"error": f"Parse failed: {str(e)}"}
)
async def _extractData(self, parameters: Dict[str, Any]) -> MethodResult:
"""Extract data from web content"""
try:
content = parameters["content"]
contentType = parameters.get("contentType", "html")
selectors = parameters["selectors"]
if contentType == "html":
soup = BeautifulSoup(content, "html.parser")
results = {}
for key, selector in selectors.items():
elements = soup.select(selector)
if len(elements) == 1:
results[key] = elements[0].get_text().strip()
else:
results[key] = [el.get_text().strip() for el in elements]
return self._createResult(
success=True,
data={
"type": "html",
"results": results
}
)
elif contentType == "json":
data = json.loads(content)
results = {}
for key, path in selectors.items():
value = data
for part in path.split("."):
if isinstance(value, dict):
value = value.get(part)
elif isinstance(value, list) and part.isdigit():
value = value[int(part)]
else:
value = None
break
results[key] = value
return self._createResult(
success=True,
data={
"type": "json",
"results": results
}
)
else:
raise ValueError(f"Unsupported content type: {contentType}")
except Exception as e:
logger.error(f"Error extracting data: {e}")
return self._createResult(
success=False,
data={"error": f"Extract failed: {str(e)}"}
)
async def _search_web(self, parameters: Dict[str, Any]) -> MethodResult:
"""Search web content"""
try:
query = parameters["query"]
maxResults = parameters.get("maxResults", 10)
filters = parameters.get("filters", {})
searchEngine = parameters.get("searchEngine", "google")
# Implement search using different engines
if searchEngine.lower() == "google":
# Use Google Custom Search API
# TODO: Implement Google Custom Search API integration
results = await self._google_search(query, maxResults, filters)
elif searchEngine.lower() == "bing":
# Use Bing Web Search API
# TODO: Implement Bing Web Search API integration
results = await self._bing_search(query, maxResults, filters)
else:
return self._createResult(
success=False,
data={"error": f"Unsupported search engine: {searchEngine}"}
)
return self._createResult(
success=True,
data={
"query": query,
"engine": searchEngine,
"results": results
}
)
except Exception as e:
logger.error(f"Error searching web: {e}")
return self._createResult(
success=False,
data={"error": f"Search failed: {str(e)}"}
)
async def _google_search(self, query: str, max_results: int, filters: Dict[str, Any]) -> list:
"""Search using Google Custom Search API"""
# TODO: Implement Google Custom Search API
# This is a placeholder implementation
return [
{
"title": "Example Result",
"url": "https://example.com",
"snippet": "Example search result snippet",
"source": "google"
}
]
async def _bing_search(self, query: str, max_results: int, filters: Dict[str, Any]) -> list:
"""Search using Bing Web Search API"""
# TODO: Implement Bing Web Search API
# This is a placeholder implementation
return [
{
"title": "Example Result",
"url": "https://example.com",
"snippet": "Example search result snippet",
"source": "bing"
}
]
async def _crawl_page(self, parameters: Dict[str, Any]) -> MethodResult:
"""Crawl web pages"""
try:
url = parameters["url"]
depth = parameters.get("depth", 1)
followLinks = parameters.get("followLinks", False)
includeImages = parameters.get("includeImages", False)
respectRobots = parameters.get("respectRobots", True)
# Check robots.txt if required
if respectRobots:
if not await self._check_robots_txt(url):
return self._createResult(
success=False,
data={"error": "Crawling not allowed by robots.txt"}
)
# Crawl the page
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
# Extract basic information
result = {
"url": url,
"title": soup.title.string if soup.title else None,
"description": self._get_meta_description(soup),
"links": [],
"images": [] if includeImages else None,
"text": soup.get_text(strip=True),
"crawled": datetime.now(UTC).isoformat()
}
# Extract links if followLinks is True
if followLinks:
baseUrl = url
for link in soup.find_all('a'):
href = link.get('href')
if href:
absoluteUrl = urljoin(baseUrl, href)
if self._is_valid_url(absoluteUrl):
result["links"].append({
"url": absoluteUrl,
"text": link.get_text(strip=True)
})
# Extract images if includeImages is True
if includeImages:
for img in soup.find_all('img'):
src = img.get('src')
if src:
absoluteSrc = urljoin(url, src)
result["images"].append({
"url": absoluteSrc,
"alt": img.get('alt', ''),
"title": img.get('title', '')
})
return self._createResult(
success=True,
data=result
)
else:
return self._createResult(
success=False,
data={"error": f"Failed to fetch URL: {response.status}"}
)
except Exception as e:
logger.error(f"Error crawling page: {e}")
return self._createResult(
success=False,
data={"error": f"Crawl failed: {str(e)}"}
)
def _get_meta_description(self, soup: BeautifulSoup) -> Optional[str]:
"""Extract meta description from HTML"""
metaDesc = soup.find('meta', attrs={'name': 'description'})
if metaDesc:
return metaDesc.get('content')
return None
def _is_valid_url(self, url: str) -> bool:
"""Check if URL is valid"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc])
except:
return False
async def _check_robots_txt(self, url: str) -> bool:
"""Check if URL is allowed by robots.txt"""
try:
parsedUrl = urlparse(url)
robotsUrl = f"{parsedUrl.scheme}://{parsedUrl.netloc}/robots.txt"
async with aiohttp.ClientSession() as session:
async with session.get(robotsUrl, headers={"User-Agent": self.userAgent}, timeout=self.timeout) as response:
if response.status == 200:
robotsContent = await response.text()
# Parse robots.txt content
userAgent = "*" # Default to all user agents
disallowPaths = []
for line in robotsContent.splitlines():
line = line.strip().lower()
if line.startswith("user-agent:"):
userAgent = line[11:].strip()
elif line.startswith("disallow:") and userAgent in ["*", self.userAgent.lower()]:
path = line[9:].strip()
if path:
disallowPaths.append(path)
# Check if URL path is disallowed
urlPath = parsedUrl.path
for disallowPath in disallowPaths:
if urlPath.startswith(disallowPath):
return False
return True
else:
# If robots.txt doesn't exist, assume crawling is allowed
return True
except Exception as e:
logger.warning(f"Error checking robots.txt for {url}: {str(e)}")
# If there's an error, assume crawling is allowed
return True
def _detect_language(self, soup: BeautifulSoup) -> str:
"""Detect page language"""
try:
# Try to get language from HTML lang attribute
if soup.html and soup.html.get('lang'):
return soup.html.get('lang')
# Try to get language from meta tag
metaLang = soup.find('meta', attrs={'http-equiv': 'content-language'})
if metaLang:
return metaLang.get('content', 'en')
# Try to get language from meta charset
metaCharset = soup.find('meta', attrs={'charset': True})
if metaCharset:
charset = metaCharset.get('charset', '').lower()
if 'utf-8' in charset:
return 'en' # Default to English for UTF-8
# Try to detect language from content
# This is a simple heuristic based on common words
text = soup.get_text().lower()
commonWords = {
'en': ['the', 'and', 'of', 'to', 'in', 'is', 'that', 'for', 'it', 'with'],
'es': ['el', 'la', 'los', 'las', 'de', 'y', 'en', 'que', 'por', 'con'],
'fr': ['le', 'la', 'les', 'de', 'et', 'en', 'que', 'pour', 'avec', 'dans'],
'de': ['der', 'die', 'das', 'und', 'in', 'den', 'von', 'zu', 'für', 'mit']
}
wordCounts = {lang: sum(1 for word in words if f' {word} ' in f' {text} ')
for lang, words in commonWords.items()}
if wordCounts:
return max(wordCounts.items(), key=lambda x: x[1])[0]
return 'en' # Default to English if no language detected
except Exception as e:
logger.warning(f"Error detecting language: {str(e)}")
return 'en' # Default to English on error