""" Web operations method module. Handles web scraping, crawling, and search operations. """ import logging import requests import json import re import copy from typing import Dict, Any, List, Optional from datetime import datetime, UTC from urllib.parse import urlparse, urljoin import time import random from bs4 import BeautifulSoup import os # Selenium imports for JavaScript-heavy pages from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.common.exceptions import WebDriverException from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from modules.chat.methodBase import MethodBase, action from modules.interfaces.interfaceChatModel import ActionResult from modules.shared.configuration import APP_CONFIG from modules.shared.timezoneUtils import get_utc_timestamp logger = logging.getLogger(__name__) class MethodWeb(MethodBase): """ Web method implementation for web operations. - web.search: Uses Google SerpAPI to find relevant URLs for a query. Returns only search result metadata (title, URL, snippet). Does NOT fetch or extract page content. - web.crawl: Fetches and extracts main content from a list of URLs, either provided directly or via referenced documents. Uses a headless browser for JavaScript-heavy pages. """ def __init__(self, serviceCenter: Any): super().__init__(serviceCenter) self.name = "web" self.description = "Handle web operations like search and crawling" self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "") self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google") self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto") self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5")) self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" self.timeout = 30 def _format_timestamp_for_filename(self) -> str: """Format current timestamp as YYYYMMDD-hhmmss for filenames.""" return datetime.now(UTC).strftime("%Y%m%d-%H%M%S") def _readUrl(self, url: str) -> BeautifulSoup: """Read a URL and return a BeautifulSoup parser for the content with enhanced error handling""" if not url or not url.startswith(('http://', 'https://')): logger.error(f"Invalid URL: {url}") return None # Enhanced headers to mimic real browser headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9,de;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0' } try: # Use session for better connection handling session = requests.Session() session.headers.update(headers) # Initial request with allow_redirects response = session.get(url, timeout=self.timeout, allow_redirects=True) # Handle various status codes if response.status_code == 200: # Success - parse content logger.debug(f"Successfully read URL: {url}") return BeautifulSoup(response.text, 'html.parser') elif response.status_code == 202: # Accepted - retry with backoff logger.info(f"Status 202 for {url}, retrying with backoff...") backoff_times = [1.0, 2.0, 5.0, 10.0] for wait_time in backoff_times: time.sleep(wait_time) retry_response = session.get(url, timeout=self.timeout, allow_redirects=True) if retry_response.status_code == 200: logger.debug(f"Successfully read URL after retry: {url}") return BeautifulSoup(retry_response.text, 'html.parser') elif retry_response.status_code != 202: break logger.warning(f"Failed to read URL after retries: {url}") return None elif response.status_code in [301, 302, 307, 308]: # Redirect - should be handled by allow_redirects=True logger.warning(f"Unexpected redirect status {response.status_code} for {url}") return None elif response.status_code == 403: # Forbidden - try with different user agent logger.warning(f"403 Forbidden for {url}, trying with different user agent...") headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' session.headers.update(headers) retry_response = session.get(url, timeout=self.timeout, allow_redirects=True) if retry_response.status_code == 200: logger.debug(f"Successfully read URL with different user agent: {url}") return BeautifulSoup(retry_response.text, 'html.parser') else: logger.error(f"Still getting {retry_response.status_code} for {url}") return None elif response.status_code == 429: # Rate limited - wait and retry logger.warning(f"Rate limited for {url}, waiting 30 seconds...") time.sleep(30) retry_response = session.get(url, timeout=self.timeout, allow_redirects=True) if retry_response.status_code == 200: logger.debug(f"Successfully read URL after rate limit: {url}") return BeautifulSoup(retry_response.text, 'html.parser') else: logger.error(f"Still getting {retry_response.status_code} after rate limit wait for {url}") return None else: # Other error status codes logger.error(f"HTTP {response.status_code} for {url}") return None except requests.exceptions.Timeout: logger.error(f"Timeout reading URL: {url}") return None except requests.exceptions.ConnectionError: logger.error(f"Connection error reading URL: {url}") return None except requests.exceptions.RequestException as e: logger.error(f"Request error reading URL {url}: {str(e)}") return None except Exception as e: logger.error(f"Unexpected error reading URL {url}: {str(e)}") return None def _extractTitle(self, soup: BeautifulSoup, url: str) -> str: """Extract the title from a webpage""" if not soup: return f"Error with {url}" # Extract title from title tag title_tag = soup.find('title') title = title_tag.text.strip() if title_tag else "No title" # Alternative: Also look for h1 tags if title tag is missing if title == "No title": h1_tag = soup.find('h1') if h1_tag: title = h1_tag.text.strip() return title def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 50000) -> str: """Extract the main content from an HTML page with enhanced content detection""" if not soup: return "" # Try to find main content elements in priority order with more selectors main_content = None content_selectors = [ 'main', 'article', '#content', '.content', '#main', '.main', '.post-content', '.entry-content', '.article-content', '.page-content', '[role="main"]', '.container', '.wrapper' ] for selector in content_selectors: content = soup.select_one(selector) if content: main_content = content logger.debug(f"Found main content using selector: {selector}") break # If no main content found, use the body if not main_content: main_content = soup.find('body') or soup logger.debug("Using body as main content") # Safely copy the main_content element if main_content is None: return "" try: content_copy = copy.copy(main_content) except Exception: content_copy = main_content # Remove elements that don't contribute to main content (less aggressive) elements_to_remove = [ 'script', 'style', 'noscript', 'nav', 'footer', 'header', 'aside', '.sidebar', '#sidebar', '.comments', '#comments', '.advertisement', '.ads', '.ad', '.banner', 'iframe', '.social-share', '.share-buttons', '.breadcrumb', '.breadcrumbs', '.pagination', '.related-posts', '.related-articles', '.newsletter', '.subscribe', '.signup', '.cookie-notice', '.privacy-notice', '.popup', '.modal', '.overlay' ] for selector in elements_to_remove: for element in content_copy.select(selector): element.extract() # Extract text content with better formatting text_content = content_copy.get_text(separator='\n', strip=True) # Clean up the text lines = text_content.split('\n') cleaned_lines = [] for line in lines: line = line.strip() if line and len(line) > 10: # Only keep meaningful lines cleaned_lines.append(line) # Join lines with proper spacing cleaned_content = '\n\n'.join(cleaned_lines) # If content is too short, try alternative extraction if len(cleaned_content) < 500: logger.debug("Content too short, trying alternative extraction...") # Try to extract from all paragraphs paragraphs = soup.find_all(['p', 'div', 'section']) alt_content = [] for p in paragraphs: text = p.get_text(strip=True) if text and len(text) > 20: # Only meaningful paragraphs alt_content.append(text) if alt_content: cleaned_content = '\n\n'.join(alt_content[:20]) # Limit to first 20 paragraphs # Limit to max_chars but preserve complete sentences if len(cleaned_content) > max_chars: # Try to cut at a sentence boundary sentences = cleaned_content.split('. ') truncated_content = "" for sentence in sentences: if len(truncated_content + sentence) < max_chars: truncated_content += sentence + ". " else: break cleaned_content = truncated_content.strip() logger.debug(f"Extracted {len(cleaned_content)} characters of content") return cleaned_content def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]: """Check basic accessibility features""" issues = [] warnings = [] # Check for alt text on images images_without_alt = soup.find_all('img', alt='') if images_without_alt: issues.append(f"Found {len(images_without_alt)} images without alt text") # Check for proper heading structure headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']) if not headings: warnings.append("No headings found - poor document structure") # Check for form labels forms = soup.find_all('form') for form in forms: inputs = form.find_all('input') for input_elem in inputs: if input_elem.get('type') not in ['submit', 'button', 'hidden']: if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}): warnings.append("Form input without proper label") return { "status": "warning" if warnings else "pass", "issues": issues, "warnings": warnings } def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]: """Check basic SEO features""" issues = [] warnings = [] # Check for title tag title = soup.find('title') if not title: issues.append("Missing title tag") elif len(title.get_text()) < 10: warnings.append("Title tag is too short") elif len(title.get_text()) > 60: warnings.append("Title tag is too long") # Check for meta description meta_desc = soup.find('meta', attrs={'name': 'description'}) if not meta_desc: warnings.append("Missing meta description") elif meta_desc.get('content'): if len(meta_desc.get('content')) < 50: warnings.append("Meta description is too short") elif len(meta_desc.get('content')) > 160: warnings.append("Meta description is too long") # Check for h1 tag h1_tags = soup.find_all('h1') if not h1_tags: warnings.append("No H1 tag found") elif len(h1_tags) > 1: warnings.append("Multiple H1 tags found") return { "status": "warning" if warnings else "pass", "issues": issues, "warnings": warnings } def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: """Check basic performance indicators""" warnings = [] # Count images images = soup.find_all('img') if len(images) > 20: warnings.append(f"Many images found ({len(images)}) - may impact loading speed") # Check for external resources external_scripts = soup.find_all('script', src=True) external_styles = soup.find_all('link', rel='stylesheet') if len(external_scripts) > 10: warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed") if len(external_styles) > 5: warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed") return { "status": "warning" if warnings else "pass", "warnings": warnings, "metrics": { "images": len(images), "external_scripts": len(external_scripts), "external_styles": len(external_styles) } } def _detectJavaScriptRendering(self, soup: BeautifulSoup) -> bool: """Detect if a page likely requires JavaScript rendering""" if not soup: return False # Check for common indicators of JavaScript-rendered content indicators = [ # Angular, React, Vue indicators soup.find('div', {'ng-app': True}), soup.find('div', {'id': 'root'}), soup.find('div', {'id': 'app'}), soup.find('div', {'id': 'react-root'}), # SPA indicators soup.find('div', {'id': 'spa-root'}), soup.find('div', {'class': 'spa-container'}), # Modern framework indicators soup.find('div', {'data-reactroot': True}), soup.find('div', {'data-ng-controller': True}), # Empty content with scripts len(soup.get_text(strip=True)) < 100 and len(soup.find_all('script')) > 2 ] return any(indicators) def _extractMetaInformation(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: """Extract meta information from the page""" meta_info = { "url": url, "title": self._extractTitle(soup, url), "description": "", "keywords": "", "author": "", "language": "", "robots": "", "viewport": "", "charset": "", "canonical": "" } # Extract meta tags meta_tags = soup.find_all('meta') for meta in meta_tags: name = meta.get('name', '').lower() property = meta.get('property', '').lower() content = meta.get('content', '') if name == 'description' or property == 'og:description': meta_info['description'] = content elif name == 'keywords': meta_info['keywords'] = content elif name == 'author': meta_info['author'] = content elif name == 'language': meta_info['language'] = content elif name == 'robots': meta_info['robots'] = content elif name == 'viewport': meta_info['viewport'] = content elif property == 'og:title': meta_info['title'] = content elif property == 'og:url': meta_info['canonical'] = content # Extract charset charset_meta = soup.find('meta', charset=True) if charset_meta: meta_info['charset'] = charset_meta.get('charset', '') # Extract canonical URL canonical_link = soup.find('link', rel='canonical') if canonical_link: meta_info['canonical'] = canonical_link.get('href', '') return meta_info def _getAlternativeApproaches(self, url: str, requires_js: bool, content_length: int) -> List[str]: """Get alternative approaches for sites that are difficult to crawl""" approaches = [] if requires_js: approaches.extend([ "Site requires JavaScript rendering - consider using a headless browser", "Try accessing the site's API endpoints directly", "Look for RSS feeds or sitemaps", "Check if the site has a mobile version that's easier to parse" ]) if content_length < 100: approaches.extend([ "Site may have anti-bot protection - try with different user agents", "Check if the site requires authentication", "Look for alternative URLs (www vs non-www, http vs https)", "Try accessing the site's robots.txt for crawling guidelines" ]) # Add general suggestions approaches.extend([ "Use the web.search action to find alternative sources", "Try the web.scrape action with specific CSS selectors", "Check if the site has a public API or data export" ]) return approaches @action async def search(self, parameters: Dict[str, Any]) -> ActionResult: """ Perform a web search and output a .txt file with a plain list of URLs (one per line). Parameters: query (str): Search query to perform maxResults (int, optional): Maximum number of results (default: 10) filter (str, optional): Filter criteria for search results expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: query = parameters.get("query") max_results = parameters.get("maxResults", 10) filter_param = parameters.get("filter") expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not query: return ActionResult.isFailure(error="Search query is required") if not self.srcApikey: return ActionResult.isFailure(error="SerpAPI key not configured") userLanguage = "en" if hasattr(self.service, 'user') and hasattr(self.service.user, 'language'): userLanguage = self.service.user.language params = { "engine": self.srcEngine, "q": query, "api_key": self.srcApikey, "num": min(max_results, self.maxResults), "hl": userLanguage } if filter_param: params["filter"] = filter_param response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout) response.raise_for_status() search_results = response.json() results = [] if "organic_results" in search_results: results = search_results["organic_results"][:max_results] # Assume 'results' is a list of dicts with 'url' keys urls = [item['url'] for item in results if 'url' in item and isinstance(item['url'], str)] url_list_str = "\n".join(urls) # Determine output format based on expected formats output_extension = ".txt" # Default output_mime_type = "text/plain" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", ".txt") output_mime_type = expected_format.get("mimeType", "text/plain") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info("No expected format specified, using default .txt format") # Create result data result_data = { "query": query, "maxResults": max_results, "filter": filter_param, "totalResults": len(urls), "urls": urls, "urlList": url_list_str, "timestamp": get_utc_timestamp() } return ActionResult( success=True, documents=[ { "documentName": f"web_search_{self._format_timestamp_for_filename()}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] ) except Exception as e: logger.error(f"Error searching web: {str(e)}") return ActionResult( success=False, error=str(e) ) def _selenium_extract_content(self, url: str) -> Optional[str]: """Use Selenium to fetch and extract main content from a JS-heavy page.""" options = Options() options.headless = True options.add_argument('--no-sandbox') options.add_argument('--disable-dev-shm-usage') options.add_argument(f'user-agent={self.user_agent}') try: driver = webdriver.Chrome(options=options) driver.set_page_load_timeout(self.timeout) driver.get(url) # Wait for body to load WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body"))) html = driver.page_source driver.quit() soup = BeautifulSoup(html, 'html.parser') return self._extractMainContent(soup) except WebDriverException as e: logger.warning(f"Selenium failed for {url}: {str(e)}") return None except Exception as e: logger.warning(f"Selenium error for {url}: {str(e)}") return None @action async def crawl(self, parameters: Dict[str, Any]) -> ActionResult: """ Crawl a list of URLs provided in a document (.txt) with URLs separated by newline, comma, or semicolon. Parameters: document (str): Document containing URL list expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: document = parameters.get("document") expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not document: return ActionResult.isFailure(error="No document with URL list provided.") # Read the document content with open(document, "r", encoding="utf-8") as f: content = f.read() # Split URLs by newline, comma, or semicolon import re urls = re.split(r'[\n,;]+', content) urls = [u.strip() for u in urls if u.strip()] if not urls: return ActionResult.isFailure(error="No valid URLs provided in the document.") crawl_results = [] for url in urls: try: logger.info(f"Crawling URL: {url}") # Try Selenium first content = self._selenium_extract_content(url) if not content: # Fallback to requests/BeautifulSoup soup = self._readUrl(url) content = self._extractMainContent(soup) title = self._extractTitle(BeautifulSoup(content, 'html.parser'), url) if content else "No title" meta_info = {"url": url, "title": title} content_length = len(content) if content else 0 crawl_results.append({ "url": url, "title": title, "content": content, "content_length": content_length, "meta_info": meta_info, "timestamp": get_utc_timestamp() }) logger.info(f"Successfully crawled {url} - extracted {content_length} characters") except Exception as e: logger.error(f"Error crawling web page {url}: {str(e)}") crawl_results.append({ "error": str(e), "url": url, "suggestions": [ "Check if the URL is accessible", "Try with a different user agent", "Verify the site doesn't block automated access" ] }) # Determine output format based on expected formats output_extension = ".json" # Default output_mime_type = "application/json" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", ".json") output_mime_type = expected_format.get("mimeType", "application/json") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info("No expected format specified, using default .json format") result_data = { "urls": urls, "maxDepth": 1, # Simplified crawl "includeImages": False, "followLinks": True, "crawlResults": crawl_results, "summary": { "total_urls": len(urls), "successful_crawls": len([r for r in crawl_results if "error" not in r]), "failed_crawls": len([r for r in crawl_results if "error" in r]), "total_content_chars": sum([r.get("content_length", 0) for r in crawl_results if "content_length" in r]) }, "timestamp": get_utc_timestamp() } return ActionResult( success=True, documents=[ { "documentName": f"web_crawl_{self._format_timestamp_for_filename()}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] ) except Exception as e: logger.error(f"Error crawling web pages: {str(e)}") return ActionResult( success=False, error=str(e) ) @action async def scrape(self, parameters: Dict[str, Any]) -> ActionResult: """ Scrape specific data from web pages Parameters: url (str): URL to scrape selectors (Dict[str, str]): CSS selectors for data extraction format (str, optional): Output format (default: "json") expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: url = parameters.get("url") selectors = parameters.get("selectors") format = parameters.get("format", "json") expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not url or not selectors: return ActionResult( success=False, error="URL and selectors are required" ) # Read the URL soup = self._readUrl(url) if not soup: return ActionResult( success=False, error="Failed to read URL" ) extracted_content = {} if selectors: # Extract content using provided selectors for selector_name, selector in selectors.items(): elements = soup.select(selector) if elements: if format == "text": extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements] elif format == "html": extracted_content[selector_name] = [str(elem) for elem in elements] else: extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements] else: extracted_content[selector_name] = [] else: # Auto-extract common elements extracted_content = { "title": self._extractTitle(soup, url), "main_content": self._extractMainContent(soup), "headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])], "links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))], "images": [img.get('src') for img in soup.find_all('img', src=True)] } scrape_result = { "url": url, "selectors": selectors, "format": format, "content": extracted_content, "timestamp": get_utc_timestamp() } # Create result data result_data = { "url": url, "selectors": selectors, "format": format, "scrapedData": scrape_result, "timestamp": get_utc_timestamp() } # Determine output format based on expected formats output_extension = f".{format}" # Default to format parameter output_mime_type = "application/json" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", f".{format}") output_mime_type = expected_format.get("mimeType", "application/json") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info(f"No expected format specified, using format parameter: {format}") return ActionResult( success=True, documents=[ { "documentName": f"web_scrape_{self._format_timestamp_for_filename()}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] ) except Exception as e: logger.error(f"Error scraping web page: {str(e)}") return ActionResult( success=False, error=str(e) )