""" Web method module. Handles web operations using the web service. """ import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC import requests from bs4 import BeautifulSoup import time from modules.workflow.methodBase import MethodBase, ActionResult, action from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) class WebService: """Service for web operations like searching and crawling""" def __init__(self, serviceContainer: Any): self.serviceContainer = serviceContainer self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" self.timeout = 30 # Web search configuration from agentWebcrawler self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "") self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google") self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto") self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5")) if not self.srcApikey: logger.warning("SerpAPI key not configured for web search") async def searchWeb(self, query: str, maxResults: int = 10) -> Dict[str, Any]: """Search web content using Google search via SerpAPI""" try: if not self.srcApikey: return { "error": "SerpAPI key not configured", "query": query } # Get user language from service container if available userLanguage = "en" # Default language if hasattr(self.serviceContainer, 'user') and hasattr(self.serviceContainer.user, 'language'): userLanguage = self.serviceContainer.user.language # Format the search request for SerpAPI params = { "engine": self.srcEngine, "q": query, "api_key": self.srcApikey, "num": min(maxResults, self.maxResults), # Number of results to return "hl": userLanguage # User language } # Make the API request response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout) response.raise_for_status() # Parse JSON response search_results = response.json() # Extract organic results results = [] if "organic_results" in search_results: for result in search_results["organic_results"][:maxResults]: # Extract title title = result.get("title", "No title") # Extract URL url = result.get("link", "No URL") # Extract snippet snippet = result.get("snippet", "No description") # Get actual page content try: targetPageSoup = self._readUrl(url) content = self._extractMainContent(targetPageSoup) except Exception as e: logger.warning(f"Error extracting content from {url}: {str(e)}") content = f"Error extracting content: {str(e)}" results.append({ 'title': title, 'url': url, 'snippet': snippet, 'content': content }) # Limit number of results if len(results) >= maxResults: break else: logger.warning(f"No organic results found in SerpAPI response for: {query}") return { "query": query, "maxResults": maxResults, "results": results, "totalFound": len(results), "timestamp": datetime.now(UTC).isoformat() } except Exception as e: logger.error(f"Error searching web: {str(e)}") return { "error": str(e), "query": query } async def crawlPage(self, url: str, depth: int = 1, followLinks: bool = True, extractContent: bool = True) -> Dict[str, Any]: """Crawl web page and extract content""" try: # Read the URL soup = self._readUrl(url) if not soup: return { "error": "Failed to read URL", "url": url } # Extract basic information title = self._extractTitle(soup, url) content = self._extractMainContent(soup) if extractContent else "" # Extract links if requested links = [] if followLinks: for link in soup.find_all('a', href=True): href = link.get('href') if href and href.startswith(('http://', 'https://')): links.append({ 'url': href, 'text': link.get_text(strip=True)[:100] }) # Extract images images = [] for img in soup.find_all('img', src=True): src = img.get('src') if src: images.append({ 'src': src, 'alt': img.get('alt', ''), 'title': img.get('title', '') }) return { "url": url, "depth": depth, "followLinks": followLinks, "extractContent": extractContent, "title": title, "content": content, "links": links[:10], # Limit to first 10 links "images": images[:10], # Limit to first 10 images "timestamp": datetime.now(UTC).isoformat() } except Exception as e: logger.error(f"Error crawling web page: {str(e)}") return { "error": str(e), "url": url } async def extractContent(self, url: str, selectors: Dict[str, str] = None, format: str = "text") -> Dict[str, Any]: """Extract content from web page using selectors""" try: # Read the URL soup = self._readUrl(url) if not soup: return { "error": "Failed to read URL", "url": url } extracted_content = {} if selectors: # Extract content using provided selectors for selector_name, selector in selectors.items(): elements = soup.select(selector) if elements: if format == "text": extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements] elif format == "html": extracted_content[selector_name] = [str(elem) for elem in elements] else: extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements] else: extracted_content[selector_name] = [] else: # Auto-extract common elements extracted_content = { "title": self._extractTitle(soup, url), "main_content": self._extractMainContent(soup), "headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])], "links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))], "images": [img.get('src') for img in soup.find_all('img', src=True)] } return { "url": url, "selectors": selectors, "format": format, "content": extracted_content, "timestamp": datetime.now(UTC).isoformat() } except Exception as e: logger.error(f"Error extracting content: {str(e)}") return { "error": str(e), "url": url } async def validatePage(self, url: str, checks: List[str] = None) -> Dict[str, Any]: """Validate web page for various criteria""" if checks is None: checks = ["accessibility", "seo", "performance"] try: # Read the URL soup = self._readUrl(url) if not soup: return { "error": "Failed to read URL", "url": url } validation_results = {} for check in checks: if check == "accessibility": validation_results["accessibility"] = self._checkAccessibility(soup) elif check == "seo": validation_results["seo"] = self._checkSEO(soup) elif check == "performance": validation_results["performance"] = self._checkPerformance(soup, url) else: validation_results[check] = {"status": "unknown", "message": f"Unknown check type: {check}"} return { "url": url, "checks": checks, "results": validation_results, "timestamp": datetime.now(UTC).isoformat() } except Exception as e: logger.error(f"Error validating web page: {str(e)}") return { "error": str(e), "url": url } def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]: """Check basic accessibility features""" issues = [] warnings = [] # Check for alt text on images images_without_alt = soup.find_all('img', alt='') if images_without_alt: issues.append(f"Found {len(images_without_alt)} images without alt text") # Check for proper heading structure headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']) if not headings: warnings.append("No headings found - poor document structure") # Check for form labels forms = soup.find_all('form') for form in forms: inputs = form.find_all('input') for input_elem in inputs: if input_elem.get('type') not in ['submit', 'button', 'hidden']: if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}): warnings.append("Form input without proper label") return { "status": "warning" if warnings else "pass", "issues": issues, "warnings": warnings } def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]: """Check basic SEO features""" issues = [] warnings = [] # Check for title tag title = soup.find('title') if not title: issues.append("Missing title tag") elif len(title.get_text()) < 10: warnings.append("Title tag is too short") elif len(title.get_text()) > 60: warnings.append("Title tag is too long") # Check for meta description meta_desc = soup.find('meta', attrs={'name': 'description'}) if not meta_desc: warnings.append("Missing meta description") elif meta_desc.get('content'): if len(meta_desc.get('content')) < 50: warnings.append("Meta description is too short") elif len(meta_desc.get('content')) > 160: warnings.append("Meta description is too long") # Check for h1 tag h1_tags = soup.find_all('h1') if not h1_tags: warnings.append("No H1 tag found") elif len(h1_tags) > 1: warnings.append("Multiple H1 tags found") return { "status": "warning" if warnings else "pass", "issues": issues, "warnings": warnings } def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: """Check basic performance indicators""" warnings = [] # Count images images = soup.find_all('img') if len(images) > 20: warnings.append(f"Many images found ({len(images)}) - may impact loading speed") # Check for external resources external_scripts = soup.find_all('script', src=True) external_styles = soup.find_all('link', rel='stylesheet') if len(external_scripts) > 10: warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed") if len(external_styles) > 5: warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed") return { "status": "warning" if warnings else "pass", "warnings": warnings, "metrics": { "images": len(images), "external_scripts": len(external_scripts), "external_styles": len(external_styles) } } def _readUrl(self, url: str) -> BeautifulSoup: """Read a URL and return a BeautifulSoup parser for the content""" if not url or not url.startswith(('http://', 'https://')): return None headers = { 'User-Agent': self.user_agent, 'Accept': 'text/html,application/xhtml+xml,application/xml', 'Accept-Language': 'en-US,en;q=0.9', } try: # Initial request response = requests.get(url, headers=headers, timeout=self.timeout) # Handling for status 202 if response.status_code == 202: # Retry with backoff backoff_times = [0.5, 1.0, 2.0, 5.0] for wait_time in backoff_times: time.sleep(wait_time) response = requests.get(url, headers=headers, timeout=self.timeout) if response.status_code != 202: break # Raise for error status codes response.raise_for_status() # Parse HTML return BeautifulSoup(response.text, 'html.parser') except Exception as e: logger.error(f"Error reading URL {url}: {str(e)}") return None def _extractTitle(self, soup: BeautifulSoup, url: str) -> str: """Extract the title from a webpage""" if not soup: return f"Error with {url}" # Extract title from title tag title_tag = soup.find('title') title = title_tag.text.strip() if title_tag else "No title" # Alternative: Also look for h1 tags if title tag is missing if title == "No title": h1_tag = soup.find('h1') if h1_tag: title = h1_tag.text.strip() return title def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 10000) -> str: """Extract the main content from an HTML page""" if not soup: return "" # Try to find main content elements in priority order main_content = None for selector in ['main', 'article', '#content', '.content', '#main', '.main']: content = soup.select_one(selector) if content: main_content = content break # If no main content found, use the body if not main_content: main_content = soup.find('body') or soup # Remove script, style, nav, footer elements that don't contribute to main content for element in main_content.select('script, style, nav, footer, header, aside, .sidebar, #sidebar, .comments, #comments, .advertisement, .ads, iframe'): element.extract() # Extract text content text_content = main_content.get_text(separator=' ', strip=True) # Limit to max_chars return text_content[:max_chars] class MethodWeb(MethodBase): """Web method implementation for web operations""" def __init__(self, serviceContainer: Any): """Initialize the web method""" super().__init__(serviceContainer) self.name = "web" self.description = "Handle web operations like searching and crawling" self.webService = WebService(serviceContainer) @action async def search(self, parameters: Dict[str, Any]) -> ActionResult: """Search web content""" try: query = parameters.get("query") maxResults = parameters.get("maxResults", 10) if not query: return self._createResult( success=False, data={}, error="Search query is required" ) # Search web results = await self.webService.searchWeb( query=query, maxResults=maxResults ) return self._createResult( success=True, data=results ) except Exception as e: logger.error(f"Error searching web: {str(e)}") return self._createResult( success=False, data={}, error=str(e) ) @action async def crawl(self, parameters: Dict[str, Any]) -> ActionResult: """Crawl web page""" try: url = parameters.get("url") depth = parameters.get("depth", 1) followLinks = parameters.get("followLinks", True) extractContent = parameters.get("extractContent", True) if not url: return self._createResult( success=False, data={}, error="URL is required" ) # Crawl page results = await self.webService.crawlPage( url=url, depth=depth, followLinks=followLinks, extractContent=extractContent ) return self._createResult( success=True, data=results ) except Exception as e: logger.error(f"Error crawling web page: {str(e)}") return self._createResult( success=False, data={}, error=str(e) ) @action async def extract(self, parameters: Dict[str, Any]) -> ActionResult: """Extract content from web page""" try: url = parameters.get("url") selectors = parameters.get("selectors", {}) format = parameters.get("format", "text") if not url: return self._createResult( success=False, data={}, error="URL is required" ) # Extract content content = await self.webService.extractContent( url=url, selectors=selectors, format=format ) return self._createResult( success=True, data=content ) except Exception as e: logger.error(f"Error extracting content: {str(e)}") return self._createResult( success=False, data={}, error=str(e) ) @action async def validate(self, parameters: Dict[str, Any]) -> ActionResult: """Validate web page""" try: url = parameters.get("url") checks = parameters.get("checks", ["accessibility", "seo", "performance"]) if not url: return self._createResult( success=False, data={}, error="URL is required" ) # Validate page results = await self.webService.validatePage( url=url, checks=checks ) return self._createResult( success=True, data=results ) except Exception as e: logger.error(f"Error validating web page: {str(e)}") return self._createResult( success=False, data={}, error=str(e) )