""" Web method module. Handles web operations using the web service. """ import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC import requests from bs4 import BeautifulSoup import time import uuid import json # Added for JSON parsing from modules.chat.methodBase import MethodBase, ActionResult, action from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) class MethodWeb(MethodBase): """Web method implementation for web operations""" def __init__(self, serviceCenter: Any): """Initialize the web method""" super().__init__(serviceCenter) self.name = "web" self.description = "Handle web operations like crawling and scraping" # Web search configuration from agentWebcrawler self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "") self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google") self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto") self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5")) if not self.srcApikey: logger.warning("SerpAPI key not configured for web search") self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" self.timeout = 30 def _readUrl(self, url: str) -> BeautifulSoup: """Read a URL and return a BeautifulSoup parser for the content with enhanced error handling""" if not url or not url.startswith(('http://', 'https://')): logger.error(f"Invalid URL: {url}") return None # Enhanced headers to mimic real browser headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.9,de;q=0.8', 'Accept-Encoding': 'gzip, deflate, br', 'DNT': '1', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Cache-Control': 'max-age=0' } try: # Use session for better connection handling session = requests.Session() session.headers.update(headers) # Initial request with allow_redirects response = session.get(url, timeout=self.timeout, allow_redirects=True) # Handle various status codes if response.status_code == 200: # Success - parse content logger.debug(f"Successfully read URL: {url}") return BeautifulSoup(response.text, 'html.parser') elif response.status_code == 202: # Accepted - retry with backoff logger.info(f"Status 202 for {url}, retrying with backoff...") backoff_times = [1.0, 2.0, 5.0, 10.0] for wait_time in backoff_times: time.sleep(wait_time) retry_response = session.get(url, timeout=self.timeout, allow_redirects=True) if retry_response.status_code == 200: logger.debug(f"Successfully read URL after retry: {url}") return BeautifulSoup(retry_response.text, 'html.parser') elif retry_response.status_code != 202: break logger.warning(f"Failed to read URL after retries: {url}") return None elif response.status_code in [301, 302, 307, 308]: # Redirect - should be handled by allow_redirects=True logger.warning(f"Unexpected redirect status {response.status_code} for {url}") return None elif response.status_code == 403: # Forbidden - try with different user agent logger.warning(f"403 Forbidden for {url}, trying with different user agent...") headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' session.headers.update(headers) retry_response = session.get(url, timeout=self.timeout, allow_redirects=True) if retry_response.status_code == 200: logger.debug(f"Successfully read URL with different user agent: {url}") return BeautifulSoup(retry_response.text, 'html.parser') else: logger.error(f"Still getting {retry_response.status_code} for {url}") return None elif response.status_code == 429: # Rate limited - wait and retry logger.warning(f"Rate limited for {url}, waiting 30 seconds...") time.sleep(30) retry_response = session.get(url, timeout=self.timeout, allow_redirects=True) if retry_response.status_code == 200: logger.debug(f"Successfully read URL after rate limit: {url}") return BeautifulSoup(retry_response.text, 'html.parser') else: logger.error(f"Still getting {retry_response.status_code} after rate limit wait for {url}") return None else: # Other error status codes logger.error(f"HTTP {response.status_code} for {url}") return None except requests.exceptions.Timeout: logger.error(f"Timeout reading URL: {url}") return None except requests.exceptions.ConnectionError: logger.error(f"Connection error reading URL: {url}") return None except requests.exceptions.RequestException as e: logger.error(f"Request error reading URL {url}: {str(e)}") return None except Exception as e: logger.error(f"Unexpected error reading URL {url}: {str(e)}") return None def _extractTitle(self, soup: BeautifulSoup, url: str) -> str: """Extract the title from a webpage""" if not soup: return f"Error with {url}" # Extract title from title tag title_tag = soup.find('title') title = title_tag.text.strip() if title_tag else "No title" # Alternative: Also look for h1 tags if title tag is missing if title == "No title": h1_tag = soup.find('h1') if h1_tag: title = h1_tag.text.strip() return title def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 50000) -> str: """Extract the main content from an HTML page with enhanced content detection""" if not soup: return "" # Try to find main content elements in priority order with more selectors main_content = None content_selectors = [ 'main', 'article', '#content', '.content', '#main', '.main', '.post-content', '.entry-content', '.article-content', '.page-content', '[role="main"]', '.container', '.wrapper' ] for selector in content_selectors: content = soup.select_one(selector) if content: main_content = content logger.debug(f"Found main content using selector: {selector}") break # If no main content found, use the body if not main_content: main_content = soup.find('body') or soup logger.debug("Using body as main content") # Create a copy to avoid modifying the original content_copy = main_content.copy() # Remove elements that don't contribute to main content (less aggressive) elements_to_remove = [ 'script', 'style', 'noscript', 'nav', 'footer', 'header', 'aside', '.sidebar', '#sidebar', '.comments', '#comments', '.advertisement', '.ads', '.ad', '.banner', 'iframe', '.social-share', '.share-buttons', '.breadcrumb', '.breadcrumbs', '.pagination', '.related-posts', '.related-articles', '.newsletter', '.subscribe', '.signup', '.cookie-notice', '.privacy-notice', '.popup', '.modal', '.overlay' ] for selector in elements_to_remove: for element in content_copy.select(selector): element.extract() # Extract text content with better formatting text_content = content_copy.get_text(separator='\n', strip=True) # Clean up the text lines = text_content.split('\n') cleaned_lines = [] for line in lines: line = line.strip() if line and len(line) > 10: # Only keep meaningful lines cleaned_lines.append(line) # Join lines with proper spacing cleaned_content = '\n\n'.join(cleaned_lines) # If content is too short, try alternative extraction if len(cleaned_content) < 500: logger.debug("Content too short, trying alternative extraction...") # Try to extract from all paragraphs paragraphs = soup.find_all(['p', 'div', 'section']) alt_content = [] for p in paragraphs: text = p.get_text(strip=True) if text and len(text) > 20: # Only meaningful paragraphs alt_content.append(text) if alt_content: cleaned_content = '\n\n'.join(alt_content[:20]) # Limit to first 20 paragraphs # Limit to max_chars but preserve complete sentences if len(cleaned_content) > max_chars: # Try to cut at a sentence boundary sentences = cleaned_content.split('. ') truncated_content = "" for sentence in sentences: if len(truncated_content + sentence) < max_chars: truncated_content += sentence + ". " else: break cleaned_content = truncated_content.strip() logger.debug(f"Extracted {len(cleaned_content)} characters of content") return cleaned_content def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]: """Check basic accessibility features""" issues = [] warnings = [] # Check for alt text on images images_without_alt = soup.find_all('img', alt='') if images_without_alt: issues.append(f"Found {len(images_without_alt)} images without alt text") # Check for proper heading structure headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6']) if not headings: warnings.append("No headings found - poor document structure") # Check for form labels forms = soup.find_all('form') for form in forms: inputs = form.find_all('input') for input_elem in inputs: if input_elem.get('type') not in ['submit', 'button', 'hidden']: if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}): warnings.append("Form input without proper label") return { "status": "warning" if warnings else "pass", "issues": issues, "warnings": warnings } def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]: """Check basic SEO features""" issues = [] warnings = [] # Check for title tag title = soup.find('title') if not title: issues.append("Missing title tag") elif len(title.get_text()) < 10: warnings.append("Title tag is too short") elif len(title.get_text()) > 60: warnings.append("Title tag is too long") # Check for meta description meta_desc = soup.find('meta', attrs={'name': 'description'}) if not meta_desc: warnings.append("Missing meta description") elif meta_desc.get('content'): if len(meta_desc.get('content')) < 50: warnings.append("Meta description is too short") elif len(meta_desc.get('content')) > 160: warnings.append("Meta description is too long") # Check for h1 tag h1_tags = soup.find_all('h1') if not h1_tags: warnings.append("No H1 tag found") elif len(h1_tags) > 1: warnings.append("Multiple H1 tags found") return { "status": "warning" if warnings else "pass", "issues": issues, "warnings": warnings } def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: """Check basic performance indicators""" warnings = [] # Count images images = soup.find_all('img') if len(images) > 20: warnings.append(f"Many images found ({len(images)}) - may impact loading speed") # Check for external resources external_scripts = soup.find_all('script', src=True) external_styles = soup.find_all('link', rel='stylesheet') if len(external_scripts) > 10: warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed") if len(external_styles) > 5: warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed") return { "status": "warning" if warnings else "pass", "warnings": warnings, "metrics": { "images": len(images), "external_scripts": len(external_scripts), "external_styles": len(external_styles) } } def _detectJavaScriptRendering(self, soup: BeautifulSoup) -> bool: """Detect if a page likely requires JavaScript rendering""" if not soup: return False # Check for common indicators of JavaScript-rendered content indicators = [ # Angular, React, Vue indicators soup.find('div', {'ng-app': True}), soup.find('div', {'id': 'root'}), soup.find('div', {'id': 'app'}), soup.find('div', {'id': 'react-root'}), # SPA indicators soup.find('div', {'id': 'spa-root'}), soup.find('div', {'class': 'spa-container'}), # Modern framework indicators soup.find('div', {'data-reactroot': True}), soup.find('div', {'data-ng-controller': True}), # Empty content with scripts len(soup.get_text(strip=True)) < 100 and len(soup.find_all('script')) > 2 ] return any(indicators) def _extractMetaInformation(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]: """Extract meta information from the page""" meta_info = { "url": url, "title": self._extractTitle(soup, url), "description": "", "keywords": "", "author": "", "language": "", "robots": "", "viewport": "", "charset": "", "canonical": "" } # Extract meta tags meta_tags = soup.find_all('meta') for meta in meta_tags: name = meta.get('name', '').lower() property = meta.get('property', '').lower() content = meta.get('content', '') if name == 'description' or property == 'og:description': meta_info['description'] = content elif name == 'keywords': meta_info['keywords'] = content elif name == 'author': meta_info['author'] = content elif name == 'language': meta_info['language'] = content elif name == 'robots': meta_info['robots'] = content elif name == 'viewport': meta_info['viewport'] = content elif property == 'og:title': meta_info['title'] = content elif property == 'og:url': meta_info['canonical'] = content # Extract charset charset_meta = soup.find('meta', charset=True) if charset_meta: meta_info['charset'] = charset_meta.get('charset', '') # Extract canonical URL canonical_link = soup.find('link', rel='canonical') if canonical_link: meta_info['canonical'] = canonical_link.get('href', '') return meta_info def _getAlternativeApproaches(self, url: str, requires_js: bool, content_length: int) -> List[str]: """Get alternative approaches for sites that are difficult to crawl""" approaches = [] if requires_js: approaches.extend([ "Site requires JavaScript rendering - consider using a headless browser", "Try accessing the site's API endpoints directly", "Look for RSS feeds or sitemaps", "Check if the site has a mobile version that's easier to parse" ]) if content_length < 100: approaches.extend([ "Site may have anti-bot protection - try with different user agents", "Check if the site requires authentication", "Look for alternative URLs (www vs non-www, http vs https)", "Try accessing the site's robots.txt for crawling guidelines" ]) # Add general suggestions approaches.extend([ "Use the web.search action to find alternative sources", "Try the web.scrape action with specific CSS selectors", "Check if the site has a public API or data export" ]) return approaches async def _tryAdvancedAIWebResearch(self, action_type: str, parameters: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Try to get web research results using advanced AI first Args: action_type: Type of action ('crawl', 'scrape', or 'search') parameters: Action parameters Returns: Dict with AI results if successful, None if AI call fails """ try: # Create appropriate prompt based on action type if action_type == "crawl": prompt = self._createCrawlAIPrompt(parameters) elif action_type == "scrape": prompt = self._createScrapeAIPrompt(parameters) elif action_type == "search": prompt = self._createSearchAIPrompt(parameters) else: logger.warning(f"Unknown action type for AI research: {action_type}") return None # Try advanced AI call if hasattr(self.service, 'callAiTextAdvanced'): logger.info(f"Attempting advanced AI web research for {action_type}") response = await self.service.callAiTextAdvanced(prompt) # Parse the AI response parsed_result = self._parseAIWebResponse(response, action_type) if parsed_result: logger.info(f"Advanced AI web research successful for {action_type}") return parsed_result else: logger.warning(f"Failed to parse AI response for {action_type}") return None else: logger.warning("Service does not have callAiTextAdvanced method") return None except Exception as e: logger.warning(f"Advanced AI web research failed for {action_type}: {str(e)}") return None def _createCrawlAIPrompt(self, parameters: Dict[str, Any]) -> str: """Create AI prompt for web crawling""" urls = parameters.get("urls", []) maxDepth = parameters.get("maxDepth", 2) includeImages = parameters.get("includeImages", False) followLinks = parameters.get("followLinks", True) prompt = f""" You are an advanced AI research assistant with comprehensive knowledge about websites, companies, and online content. Please provide detailed information about the following URLs based on your extensive training data and knowledge. URLs to research: {urls} Max depth: {maxDepth} Include images: {includeImages} Follow links: {followLinks} For each URL, please provide comprehensive information including: 1. Company/organization information and background 2. Main business activities and services 3. Key personnel and leadership 4. Contact information and locations 5. Recent news and developments 6. Industry analysis and market position 7. Related companies and partnerships 8. Website structure and key pages 9. Business model and revenue streams 10. Regulatory compliance and certifications For each URL, provide: - url: The original URL - title: Company/organization name - content: Comprehensive description and analysis - content_length: Number of characters in content - meta_info: Business information object - links: Related companies and important connections - images: Company logos or key visuals if known - requires_javascript: Boolean (usually false for static info) - alternative_approaches: Additional research suggestions - timestamp: Current timestamp Return the results in this exact JSON format: {{ "urls": {urls}, "maxDepth": {maxDepth}, "includeImages": {includeImages}, "followLinks": {followLinks}, "crawlResults": [ {{ "url": "url_here", "depth": {maxDepth}, "followLinks": {followLinks}, "extractContent": true, "title": "company_name", "content": "comprehensive_company_analysis", "content_length": 1234, "meta_info": {{ "url": "url_here", "title": "company_name", "description": "business_description", "keywords": "industry_keywords", "author": "company_info", "language": "language_code", "robots": "robots_info", "viewport": "viewport_info", "charset": "charset_info", "canonical": "canonical_url" }}, "links": [ {{ "url": "related_company_url", "text": "company_name" }} ], "images": [ {{ "src": "logo_url", "alt": "company_logo", "title": "company_name", "width": "width_value", "height": "height_value" }} ], "requires_javascript": false, "alternative_approaches": ["approach1", "approach2"], "timestamp": "2024-01-01T00:00:00Z" }} ], "summary": {{ "total_urls": {len(urls)}, "successful_crawls": 0, "failed_crawls": 0, "total_content_chars": 0 }}, "timestamp": "2024-01-01T00:00:00Z" }} Please provide accurate, comprehensive information about each company/organization based on your knowledge. If you don't have specific information about a URL, provide general industry analysis and suggest alternative research approaches. """ return prompt def _createScrapeAIPrompt(self, parameters: Dict[str, Any]) -> str: """Create AI prompt for web scraping""" url = parameters.get("url") selectors = parameters.get("selectors", {}) format = parameters.get("format", "json") prompt = f""" You are an advanced AI research assistant with comprehensive knowledge about websites, companies, and online content. Please provide detailed information about the following URL and the specific data requested based on your extensive training data and knowledge. URL to research: {url} Data selectors: {selectors} Output format: {format} Please provide comprehensive information including: 1. Company/organization background and history 2. Business activities and services offered 3. Key personnel and leadership information 4. Financial information and performance data 5. Market position and competitive analysis 6. Recent news and developments 7. Contact information and locations 8. Industry trends and insights 9. Related companies and partnerships 10. Regulatory and compliance information For each data selector requested, provide relevant information in the specified format (text, html, or json). Return the results in this exact JSON format: {{ "url": "{url}", "selectors": {selectors}, "format": "{format}", "scrapedData": {{ "url": "{url}", "selectors": {selectors}, "format": "{format}", "content": {{ "company_info": ["comprehensive_company_analysis"], "business_activities": ["detailed_business_description"], "leadership": ["key_personnel_information"], "financial_data": ["financial_performance_analysis"], "market_position": ["competitive_analysis"], "recent_news": ["latest_developments"], "contact_info": ["contact_details"], "industry_insights": ["market_trends"], "partnerships": ["related_companies"], "compliance": ["regulatory_information"] }}, "timestamp": "2024-01-01T00:00:00Z" }}, "timestamp": "2024-01-01T00:00:00Z" }} Please provide accurate, comprehensive information about the company/organization based on your knowledge. If you don't have specific information about the URL, provide general industry analysis and suggest alternative research approaches. """ return prompt def _createSearchAIPrompt(self, parameters: Dict[str, Any]) -> str: """Create AI prompt for web search""" query = parameters.get("query") engine = parameters.get("engine", "google") maxResults = parameters.get("maxResults", 10) filter = parameters.get("filter") prompt = f""" You are an advanced AI research assistant with comprehensive knowledge about companies, industries, and business information. Please provide detailed information about the following search query based on your extensive training data and knowledge. Search query: {query} Search engine: {engine} Max results: {maxResults} Filter: {filter} Please provide comprehensive research results including: 1. Relevant company/organization information 2. Industry analysis and market insights 3. Key personnel and leadership details 4. Business activities and services 5. Financial performance and metrics 6. Recent news and developments 7. Competitive landscape analysis 8. Market trends and opportunities 9. Regulatory and compliance information 10. Related companies and partnerships For each search result, provide: - title: Company/organization name - url: Official website or primary source - snippet: Brief description and key highlights - content: Comprehensive analysis and insights Return the results in this exact JSON format: {{ "query": "{query}", "engine": "{engine}", "maxResults": {maxResults}, "filter": "{filter}", "searchResults": {{ "query": "{query}", "maxResults": {maxResults}, "results": [ {{ "title": "company_name", "url": "official_website", "snippet": "brief_description", "content": "comprehensive_analysis" }} ], "totalFound": 0, "timestamp": "2024-01-01T00:00:00Z" }}, "timestamp": "2024-01-01T00:00:00Z" }} Please provide accurate, comprehensive information about the search query based on your knowledge. If you don't have specific information about the query, provide general industry analysis and suggest alternative research approaches. """ return prompt def _parseAIWebResponse(self, response: str, action_type: str) -> Optional[Dict[str, Any]]: """Parse AI response into structured data""" try: # Extract JSON from response json_start = response.find('{') json_end = response.rfind('}') + 1 if json_start == -1 or json_end == 0: logger.warning(f"No JSON found in AI response: {response}") return None json_str = response[json_start:json_end] parsed_data = json.loads(json_str) # Validate basic structure based on action type if action_type == "crawl": if "crawlResults" not in parsed_data: logger.warning("Invalid crawl response structure") return None elif action_type == "scrape": if "scrapedData" not in parsed_data: logger.warning("Invalid scrape response structure") return None elif action_type == "search": if "searchResults" not in parsed_data: logger.warning("Invalid search response structure") return None return parsed_data except json.JSONDecodeError as e: logger.warning(f"Failed to parse AI response JSON: {str(e)}") return None except Exception as e: logger.warning(f"Error parsing AI response: {str(e)}") return None @action async def crawl(self, parameters: Dict[str, Any]) -> ActionResult: """ Crawl web pages and extract content with enhanced error handling and content detection Parameters: urls (List[str]): List of URLs to crawl maxDepth (int, optional): Maximum crawl depth (default: 2) includeImages (bool, optional): Whether to include images (default: False) followLinks (bool, optional): Whether to follow links (default: True) expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: urls = parameters.get("urls") maxDepth = parameters.get("maxDepth", 2) includeImages = parameters.get("includeImages", False) followLinks = parameters.get("followLinks", True) expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not urls: return self._createResult( success=False, data={}, error="URLs are required" ) # Try advanced AI research first ai_result = await self._tryAdvancedAIWebResearch("crawl", parameters) if ai_result: logger.info("Using advanced AI web research for crawl") # Reconstruct the result data from the AI response result_data = { "urls": ai_result.get("urls", []), "maxDepth": ai_result.get("maxDepth", 2), "includeImages": ai_result.get("includeImages", False), "followLinks": ai_result.get("followLinks", True), "crawlResults": ai_result.get("crawlResults", []), "summary": ai_result.get("summary", {}), "timestamp": ai_result.get("timestamp", datetime.now(UTC).isoformat()) } return self._createResult( success=True, data={ "documents": [ { "documentName": f"web_crawl_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json", "documentData": result_data, "mimeType": "application/json" } ] } ) else: logger.info("Advanced AI web research failed, falling back to regular web crawling") # Crawl each URL crawl_results = [] for url in urls: try: logger.info(f"Crawling URL: {url}") # Read the URL with enhanced error handling soup = self._readUrl(url) if not soup: logger.error(f"Failed to read URL: {url}") crawl_results.append({ "error": "Failed to read URL - check if the site is accessible and not blocking crawlers", "url": url, "suggestions": [ "Try accessing the URL directly in a browser", "Check if the site requires JavaScript", "Verify the URL is correct and accessible" ] }) continue # Extract comprehensive information title = self._extractTitle(soup, url) content = self._extractMainContent(soup) meta_info = self._extractMetaInformation(soup, url) # Check if content is meaningful content_length = len(content) if content_length < 100: logger.warning(f"Very little content extracted from {url} ({content_length} chars)") crawl_results.append({ "url": url, "title": title, "content": content, "content_length": content_length, "warning": "Very little content extracted - site may require JavaScript or have anti-bot protection", "meta_info": meta_info, "timestamp": datetime.now(UTC).isoformat() }) continue # Extract links if requested links = [] if followLinks: for link in soup.find_all('a', href=True): href = link.get('href') if href and href.startswith(('http://', 'https://')): link_text = link.get_text(strip=True) if link_text: # Only include links with text links.append({ 'url': href, 'text': link_text[:100] }) # Extract images if requested images = [] if includeImages: for img in soup.find_all('img', src=True): src = img.get('src') if src: images.append({ 'src': src, 'alt': img.get('alt', ''), 'title': img.get('title', ''), 'width': img.get('width', ''), 'height': img.get('height', '') }) # Check for JavaScript rendering requirements requires_js = self._detectJavaScriptRendering(soup) # Get alternative approaches if needed alternative_approaches = self._getAlternativeApproaches(url, requires_js, content_length) crawl_results.append({ "url": url, "depth": maxDepth, "followLinks": followLinks, "extractContent": True, "title": title, "content": content, "content_length": content_length, "meta_info": meta_info, "links": links[:20], # Limit to first 20 links "images": images[:20], # Limit to first 20 images "requires_javascript": requires_js, "alternative_approaches": alternative_approaches, "timestamp": datetime.now(UTC).isoformat() }) logger.info(f"Successfully crawled {url} - extracted {content_length} characters") except Exception as e: logger.error(f"Error crawling web page {url}: {str(e)}") crawl_results.append({ "error": str(e), "url": url, "suggestions": [ "Check if the URL is accessible", "Try with a different user agent", "Verify the site doesn't block automated access" ] }) # Create result data result_data = { "urls": urls, "maxDepth": maxDepth, "includeImages": includeImages, "followLinks": followLinks, "crawlResults": crawl_results, "summary": { "total_urls": len(urls), "successful_crawls": len([r for r in crawl_results if "error" not in r]), "failed_crawls": len([r for r in crawl_results if "error" in r]), "total_content_chars": sum([r.get("content_length", 0) for r in crawl_results if "content_length" in r]) }, "timestamp": datetime.now(UTC).isoformat() } # Determine output format based on expected formats output_extension = ".json" # Default output_mime_type = "application/json" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", ".json") output_mime_type = expected_format.get("mimeType", "application/json") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info("No expected format specified, using default .json format") return self._createResult( success=True, data={ "documents": [ { "documentName": f"web_crawl_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] } ) except Exception as e: logger.error(f"Error crawling web pages: {str(e)}") return self._createResult( success=False, data={}, error=str(e) ) @action async def scrape(self, parameters: Dict[str, Any]) -> ActionResult: """ Scrape specific data from web pages Parameters: url (str): URL to scrape selectors (Dict[str, str]): CSS selectors for data extraction format (str, optional): Output format (default: "json") expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: url = parameters.get("url") selectors = parameters.get("selectors") format = parameters.get("format", "json") expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not url or not selectors: return self._createResult( success=False, data={}, error="URL and selectors are required" ) # Try advanced AI research first ai_result = await self._tryAdvancedAIWebResearch("scrape", parameters) if ai_result: logger.info("Using advanced AI web research for scrape") # Reconstruct the result data from the AI response result_data = { "url": ai_result.get("url"), "selectors": ai_result.get("selectors"), "format": ai_result.get("format"), "scrapedData": ai_result.get("scrapedData"), "timestamp": ai_result.get("timestamp", datetime.now(UTC).isoformat()) } return self._createResult( success=True, data={ "documents": [ { "documentName": f"web_scrape_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json", "documentData": result_data, "mimeType": "application/json" } ] } ) else: logger.info("Advanced AI web research failed, falling back to regular web scraping") # Read the URL soup = self._readUrl(url) if not soup: return self._createResult( success=False, data={}, error="Failed to read URL" ) extracted_content = {} if selectors: # Extract content using provided selectors for selector_name, selector in selectors.items(): elements = soup.select(selector) if elements: if format == "text": extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements] elif format == "html": extracted_content[selector_name] = [str(elem) for elem in elements] else: extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements] else: extracted_content[selector_name] = [] else: # Auto-extract common elements extracted_content = { "title": self._extractTitle(soup, url), "main_content": self._extractMainContent(soup), "headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])], "links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))], "images": [img.get('src') for img in soup.find_all('img', src=True)] } scrape_result = { "url": url, "selectors": selectors, "format": format, "content": extracted_content, "timestamp": datetime.now(UTC).isoformat() } # Create result data result_data = { "url": url, "selectors": selectors, "format": format, "scrapedData": scrape_result, "timestamp": datetime.now(UTC).isoformat() } # Determine output format based on expected formats output_extension = f".{format}" # Default to format parameter output_mime_type = "application/json" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", f".{format}") output_mime_type = expected_format.get("mimeType", "application/json") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info(f"No expected format specified, using format parameter: {format}") return self._createResult( success=True, data={ "documents": [ { "documentName": f"web_scrape_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] } ) except Exception as e: logger.error(f"Error scraping web page: {str(e)}") return self._createResult( success=False, data={}, error=str(e) ) @action async def search(self, parameters: Dict[str, Any]) -> ActionResult: """ Search web content Parameters: query (str): Search query engine (str, optional): Search engine to use (default: "google") maxResults (int, optional): Maximum number of results (default: 10) filter (str, optional): Additional search filters expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description """ try: query = parameters.get("query") engine = parameters.get("engine", "google") maxResults = parameters.get("maxResults", 10) filter = parameters.get("filter") expectedDocumentFormats = parameters.get("expectedDocumentFormats", []) if not query: return self._createResult( success=False, data={}, error="Search query is required" ) # Try advanced AI research first ai_result = await self._tryAdvancedAIWebResearch("search", parameters) if ai_result: logger.info("Using advanced AI web research for search") # Reconstruct the result data from the AI response result_data = { "query": ai_result.get("query"), "engine": ai_result.get("engine"), "maxResults": ai_result.get("maxResults"), "filter": ai_result.get("filter"), "searchResults": ai_result.get("searchResults"), "timestamp": ai_result.get("timestamp", datetime.now(UTC).isoformat()) } return self._createResult( success=True, data={ "documents": [ { "documentName": f"web_search_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json", "documentData": result_data, "mimeType": "application/json" } ] } ) else: logger.info("Advanced AI web research failed, falling back to regular web search") # Search web content using Google search via SerpAPI try: if not self.srcApikey: search_result = { "error": "SerpAPI key not configured", "query": query } else: # Get user language from service center if available userLanguage = "en" # Default language if hasattr(self.service, 'user') and hasattr(self.service.user, 'language'): userLanguage = self.service.user.language # Format the search request for SerpAPI params = { "engine": self.srcEngine, "q": query, "api_key": self.srcApikey, "num": min(maxResults, self.maxResults), # Number of results to return "hl": userLanguage # User language } # Make the API request response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout) response.raise_for_status() # Parse JSON response search_results = response.json() # Extract organic results results = [] if "organic_results" in search_results: for result in search_results["organic_results"][:maxResults]: # Extract title title = result.get("title", "No title") # Extract URL url = result.get("link", "No URL") # Extract snippet snippet = result.get("snippet", "No description") # Get actual page content try: targetPageSoup = self._readUrl(url) content = self._extractMainContent(targetPageSoup) except Exception as e: logger.warning(f"Error extracting content from {url}: {str(e)}") content = f"Error extracting content: {str(e)}" results.append({ 'title': title, 'url': url, 'snippet': snippet, 'content': content }) # Limit number of results if len(results) >= maxResults: break else: logger.warning(f"No organic results found in SerpAPI response for: {query}") search_result = { "query": query, "maxResults": maxResults, "results": results, "totalFound": len(results), "timestamp": datetime.now(UTC).isoformat() } except Exception as e: logger.error(f"Error searching web: {str(e)}") search_result = { "error": str(e), "query": query } # Create result data result_data = { "query": query, "engine": engine, "maxResults": maxResults, "filter": filter, "searchResults": search_result, "timestamp": datetime.now(UTC).isoformat() } # Determine output format based on expected formats output_extension = ".json" # Default output_mime_type = "application/json" # Default if expectedDocumentFormats and len(expectedDocumentFormats) > 0: # Use the first expected format expected_format = expectedDocumentFormats[0] output_extension = expected_format.get("extension", ".json") output_mime_type = expected_format.get("mimeType", "application/json") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") else: logger.info("No expected format specified, using default .json format") return self._createResult( success=True, data={ "documents": [ { "documentName": f"web_search_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}{output_extension}", "documentData": result_data, "mimeType": output_mime_type } ] } ) except Exception as e: logger.error(f"Error searching web: {str(e)}") return self._createResult( success=False, data={}, error=str(e) )