813 lines
33 KiB
Python
813 lines
33 KiB
Python
"""
|
|
Web operations method module.
|
|
Handles web scraping, crawling, and search operations.
|
|
"""
|
|
|
|
import logging
|
|
import requests
|
|
import json
|
|
import re
|
|
import copy
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, UTC
|
|
from urllib.parse import urlparse, urljoin
|
|
import time
|
|
import random
|
|
from bs4 import BeautifulSoup
|
|
import os
|
|
|
|
# Selenium imports for JavaScript-heavy pages
|
|
from selenium import webdriver
|
|
from selenium.webdriver.chrome.options import Options
|
|
from selenium.common.exceptions import WebDriverException
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
from modules.chat.methodBase import MethodBase, action
|
|
from modules.interfaces.interfaceChatModel import ActionResult
|
|
from modules.shared.configuration import APP_CONFIG
|
|
from modules.shared.timezoneUtils import get_utc_timestamp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MethodWeb(MethodBase):
|
|
"""
|
|
Web method implementation for web operations.
|
|
- web.search: Uses Google SerpAPI to find relevant URLs for a query. Returns only search result metadata (title, URL, snippet). Does NOT fetch or extract page content.
|
|
- web.crawl: Fetches and extracts main content from a list of URLs, either provided directly or via referenced documents. Uses a headless browser for JavaScript-heavy pages.
|
|
"""
|
|
|
|
def __init__(self, serviceCenter: Any):
|
|
super().__init__(serviceCenter)
|
|
self.name = "web"
|
|
self.description = "Handle web operations like search and crawling"
|
|
self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "")
|
|
self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google")
|
|
self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto")
|
|
self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5"))
|
|
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
|
self.timeout = 30
|
|
|
|
def _readUrl(self, url: str) -> BeautifulSoup:
|
|
"""Read a URL and return a BeautifulSoup parser for the content with enhanced error handling"""
|
|
if not url or not url.startswith(('http://', 'https://')):
|
|
logger.error(f"Invalid URL: {url}")
|
|
return None
|
|
|
|
# Enhanced headers to mimic real browser
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.9,de;q=0.8',
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|
'DNT': '1',
|
|
'Connection': 'keep-alive',
|
|
'Upgrade-Insecure-Requests': '1',
|
|
'Sec-Fetch-Dest': 'document',
|
|
'Sec-Fetch-Mode': 'navigate',
|
|
'Sec-Fetch-Site': 'none',
|
|
'Cache-Control': 'max-age=0'
|
|
}
|
|
|
|
try:
|
|
# Use session for better connection handling
|
|
session = requests.Session()
|
|
session.headers.update(headers)
|
|
|
|
# Initial request with allow_redirects
|
|
response = session.get(url, timeout=self.timeout, allow_redirects=True)
|
|
|
|
# Handle various status codes
|
|
if response.status_code == 200:
|
|
# Success - parse content
|
|
logger.debug(f"Successfully read URL: {url}")
|
|
return BeautifulSoup(response.text, 'html.parser')
|
|
|
|
elif response.status_code == 202:
|
|
# Accepted - retry with backoff
|
|
logger.info(f"Status 202 for {url}, retrying with backoff...")
|
|
backoff_times = [1.0, 2.0, 5.0, 10.0]
|
|
|
|
for wait_time in backoff_times:
|
|
time.sleep(wait_time)
|
|
retry_response = session.get(url, timeout=self.timeout, allow_redirects=True)
|
|
|
|
if retry_response.status_code == 200:
|
|
logger.debug(f"Successfully read URL after retry: {url}")
|
|
return BeautifulSoup(retry_response.text, 'html.parser')
|
|
elif retry_response.status_code != 202:
|
|
break
|
|
|
|
logger.warning(f"Failed to read URL after retries: {url}")
|
|
return None
|
|
|
|
elif response.status_code in [301, 302, 307, 308]:
|
|
# Redirect - should be handled by allow_redirects=True
|
|
logger.warning(f"Unexpected redirect status {response.status_code} for {url}")
|
|
return None
|
|
|
|
elif response.status_code == 403:
|
|
# Forbidden - try with different user agent
|
|
logger.warning(f"403 Forbidden for {url}, trying with different user agent...")
|
|
headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
|
session.headers.update(headers)
|
|
|
|
retry_response = session.get(url, timeout=self.timeout, allow_redirects=True)
|
|
if retry_response.status_code == 200:
|
|
logger.debug(f"Successfully read URL with different user agent: {url}")
|
|
return BeautifulSoup(retry_response.text, 'html.parser')
|
|
else:
|
|
logger.error(f"Still getting {retry_response.status_code} for {url}")
|
|
return None
|
|
|
|
elif response.status_code == 429:
|
|
# Rate limited - wait and retry
|
|
logger.warning(f"Rate limited for {url}, waiting 30 seconds...")
|
|
time.sleep(30)
|
|
retry_response = session.get(url, timeout=self.timeout, allow_redirects=True)
|
|
if retry_response.status_code == 200:
|
|
logger.debug(f"Successfully read URL after rate limit: {url}")
|
|
return BeautifulSoup(retry_response.text, 'html.parser')
|
|
else:
|
|
logger.error(f"Still getting {retry_response.status_code} after rate limit wait for {url}")
|
|
return None
|
|
|
|
else:
|
|
# Other error status codes
|
|
logger.error(f"HTTP {response.status_code} for {url}")
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Timeout reading URL: {url}")
|
|
return None
|
|
except requests.exceptions.ConnectionError:
|
|
logger.error(f"Connection error reading URL: {url}")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error reading URL {url}: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error reading URL {url}: {str(e)}")
|
|
return None
|
|
|
|
def _extractTitle(self, soup: BeautifulSoup, url: str) -> str:
|
|
"""Extract the title from a webpage"""
|
|
if not soup:
|
|
return f"Error with {url}"
|
|
|
|
# Extract title from title tag
|
|
title_tag = soup.find('title')
|
|
title = title_tag.text.strip() if title_tag else "No title"
|
|
|
|
# Alternative: Also look for h1 tags if title tag is missing
|
|
if title == "No title":
|
|
h1_tag = soup.find('h1')
|
|
if h1_tag:
|
|
title = h1_tag.text.strip()
|
|
|
|
return title
|
|
|
|
def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 50000) -> str:
|
|
"""Extract the main content from an HTML page with enhanced content detection"""
|
|
if not soup:
|
|
return ""
|
|
|
|
# Try to find main content elements in priority order with more selectors
|
|
main_content = None
|
|
content_selectors = [
|
|
'main',
|
|
'article',
|
|
'#content',
|
|
'.content',
|
|
'#main',
|
|
'.main',
|
|
'.post-content',
|
|
'.entry-content',
|
|
'.article-content',
|
|
'.page-content',
|
|
'[role="main"]',
|
|
'.container',
|
|
'.wrapper'
|
|
]
|
|
|
|
for selector in content_selectors:
|
|
content = soup.select_one(selector)
|
|
if content:
|
|
main_content = content
|
|
logger.debug(f"Found main content using selector: {selector}")
|
|
break
|
|
|
|
# If no main content found, use the body
|
|
if not main_content:
|
|
main_content = soup.find('body') or soup
|
|
logger.debug("Using body as main content")
|
|
|
|
# Safely copy the main_content element
|
|
if main_content is None:
|
|
return ""
|
|
try:
|
|
content_copy = copy.copy(main_content)
|
|
except Exception:
|
|
content_copy = main_content
|
|
|
|
# Remove elements that don't contribute to main content (less aggressive)
|
|
elements_to_remove = [
|
|
'script', 'style', 'noscript',
|
|
'nav', 'footer', 'header', 'aside',
|
|
'.sidebar', '#sidebar', '.comments', '#comments',
|
|
'.advertisement', '.ads', '.ad', '.banner',
|
|
'iframe', '.social-share', '.share-buttons',
|
|
'.breadcrumb', '.breadcrumbs', '.pagination',
|
|
'.related-posts', '.related-articles',
|
|
'.newsletter', '.subscribe', '.signup',
|
|
'.cookie-notice', '.privacy-notice',
|
|
'.popup', '.modal', '.overlay'
|
|
]
|
|
|
|
for selector in elements_to_remove:
|
|
for element in content_copy.select(selector):
|
|
element.extract()
|
|
|
|
# Extract text content with better formatting
|
|
text_content = content_copy.get_text(separator='\n', strip=True)
|
|
|
|
# Clean up the text
|
|
lines = text_content.split('\n')
|
|
cleaned_lines = []
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line and len(line) > 10: # Only keep meaningful lines
|
|
cleaned_lines.append(line)
|
|
|
|
# Join lines with proper spacing
|
|
cleaned_content = '\n\n'.join(cleaned_lines)
|
|
|
|
# If content is too short, try alternative extraction
|
|
if len(cleaned_content) < 500:
|
|
logger.debug("Content too short, trying alternative extraction...")
|
|
|
|
# Try to extract from all paragraphs
|
|
paragraphs = soup.find_all(['p', 'div', 'section'])
|
|
alt_content = []
|
|
|
|
for p in paragraphs:
|
|
text = p.get_text(strip=True)
|
|
if text and len(text) > 20: # Only meaningful paragraphs
|
|
alt_content.append(text)
|
|
|
|
if alt_content:
|
|
cleaned_content = '\n\n'.join(alt_content[:20]) # Limit to first 20 paragraphs
|
|
|
|
# Limit to max_chars but preserve complete sentences
|
|
if len(cleaned_content) > max_chars:
|
|
# Try to cut at a sentence boundary
|
|
sentences = cleaned_content.split('. ')
|
|
truncated_content = ""
|
|
|
|
for sentence in sentences:
|
|
if len(truncated_content + sentence) < max_chars:
|
|
truncated_content += sentence + ". "
|
|
else:
|
|
break
|
|
|
|
cleaned_content = truncated_content.strip()
|
|
|
|
logger.debug(f"Extracted {len(cleaned_content)} characters of content")
|
|
return cleaned_content
|
|
|
|
def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]:
|
|
"""Check basic accessibility features"""
|
|
issues = []
|
|
warnings = []
|
|
|
|
# Check for alt text on images
|
|
images_without_alt = soup.find_all('img', alt='')
|
|
if images_without_alt:
|
|
issues.append(f"Found {len(images_without_alt)} images without alt text")
|
|
|
|
# Check for proper heading structure
|
|
headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
|
|
if not headings:
|
|
warnings.append("No headings found - poor document structure")
|
|
|
|
# Check for form labels
|
|
forms = soup.find_all('form')
|
|
for form in forms:
|
|
inputs = form.find_all('input')
|
|
for input_elem in inputs:
|
|
if input_elem.get('type') not in ['submit', 'button', 'hidden']:
|
|
if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}):
|
|
warnings.append("Form input without proper label")
|
|
|
|
return {
|
|
"status": "warning" if warnings else "pass",
|
|
"issues": issues,
|
|
"warnings": warnings
|
|
}
|
|
|
|
def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]:
|
|
"""Check basic SEO features"""
|
|
issues = []
|
|
warnings = []
|
|
|
|
# Check for title tag
|
|
title = soup.find('title')
|
|
if not title:
|
|
issues.append("Missing title tag")
|
|
elif len(title.get_text()) < 10:
|
|
warnings.append("Title tag is too short")
|
|
elif len(title.get_text()) > 60:
|
|
warnings.append("Title tag is too long")
|
|
|
|
# Check for meta description
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'})
|
|
if not meta_desc:
|
|
warnings.append("Missing meta description")
|
|
elif meta_desc.get('content'):
|
|
if len(meta_desc.get('content')) < 50:
|
|
warnings.append("Meta description is too short")
|
|
elif len(meta_desc.get('content')) > 160:
|
|
warnings.append("Meta description is too long")
|
|
|
|
# Check for h1 tag
|
|
h1_tags = soup.find_all('h1')
|
|
if not h1_tags:
|
|
warnings.append("No H1 tag found")
|
|
elif len(h1_tags) > 1:
|
|
warnings.append("Multiple H1 tags found")
|
|
|
|
return {
|
|
"status": "warning" if warnings else "pass",
|
|
"issues": issues,
|
|
"warnings": warnings
|
|
}
|
|
|
|
def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
|
|
"""Check basic performance indicators"""
|
|
warnings = []
|
|
|
|
# Count images
|
|
images = soup.find_all('img')
|
|
if len(images) > 20:
|
|
warnings.append(f"Many images found ({len(images)}) - may impact loading speed")
|
|
|
|
# Check for external resources
|
|
external_scripts = soup.find_all('script', src=True)
|
|
external_styles = soup.find_all('link', rel='stylesheet')
|
|
|
|
if len(external_scripts) > 10:
|
|
warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed")
|
|
|
|
if len(external_styles) > 5:
|
|
warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed")
|
|
|
|
return {
|
|
"status": "warning" if warnings else "pass",
|
|
"warnings": warnings,
|
|
"metrics": {
|
|
"images": len(images),
|
|
"external_scripts": len(external_scripts),
|
|
"external_styles": len(external_styles)
|
|
}
|
|
}
|
|
|
|
def _detectJavaScriptRendering(self, soup: BeautifulSoup) -> bool:
|
|
"""Detect if a page likely requires JavaScript rendering"""
|
|
if not soup:
|
|
return False
|
|
|
|
# Check for common indicators of JavaScript-rendered content
|
|
indicators = [
|
|
# Angular, React, Vue indicators
|
|
soup.find('div', {'ng-app': True}),
|
|
soup.find('div', {'id': 'root'}),
|
|
soup.find('div', {'id': 'app'}),
|
|
soup.find('div', {'id': 'react-root'}),
|
|
|
|
# SPA indicators
|
|
soup.find('div', {'id': 'spa-root'}),
|
|
soup.find('div', {'class': 'spa-container'}),
|
|
|
|
# Modern framework indicators
|
|
soup.find('div', {'data-reactroot': True}),
|
|
soup.find('div', {'data-ng-controller': True}),
|
|
|
|
# Empty content with scripts
|
|
len(soup.get_text(strip=True)) < 100 and len(soup.find_all('script')) > 2
|
|
]
|
|
|
|
return any(indicators)
|
|
|
|
def _extractMetaInformation(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
|
|
"""Extract meta information from the page"""
|
|
meta_info = {
|
|
"url": url,
|
|
"title": self._extractTitle(soup, url),
|
|
"description": "",
|
|
"keywords": "",
|
|
"author": "",
|
|
"language": "",
|
|
"robots": "",
|
|
"viewport": "",
|
|
"charset": "",
|
|
"canonical": ""
|
|
}
|
|
|
|
# Extract meta tags
|
|
meta_tags = soup.find_all('meta')
|
|
for meta in meta_tags:
|
|
name = meta.get('name', '').lower()
|
|
property = meta.get('property', '').lower()
|
|
content = meta.get('content', '')
|
|
|
|
if name == 'description' or property == 'og:description':
|
|
meta_info['description'] = content
|
|
elif name == 'keywords':
|
|
meta_info['keywords'] = content
|
|
elif name == 'author':
|
|
meta_info['author'] = content
|
|
elif name == 'language':
|
|
meta_info['language'] = content
|
|
elif name == 'robots':
|
|
meta_info['robots'] = content
|
|
elif name == 'viewport':
|
|
meta_info['viewport'] = content
|
|
elif property == 'og:title':
|
|
meta_info['title'] = content
|
|
elif property == 'og:url':
|
|
meta_info['canonical'] = content
|
|
|
|
# Extract charset
|
|
charset_meta = soup.find('meta', charset=True)
|
|
if charset_meta:
|
|
meta_info['charset'] = charset_meta.get('charset', '')
|
|
|
|
# Extract canonical URL
|
|
canonical_link = soup.find('link', rel='canonical')
|
|
if canonical_link:
|
|
meta_info['canonical'] = canonical_link.get('href', '')
|
|
|
|
return meta_info
|
|
|
|
def _getAlternativeApproaches(self, url: str, requires_js: bool, content_length: int) -> List[str]:
|
|
"""Get alternative approaches for sites that are difficult to crawl"""
|
|
approaches = []
|
|
|
|
if requires_js:
|
|
approaches.extend([
|
|
"Site requires JavaScript rendering - consider using a headless browser",
|
|
"Try accessing the site's API endpoints directly",
|
|
"Look for RSS feeds or sitemaps",
|
|
"Check if the site has a mobile version that's easier to parse"
|
|
])
|
|
|
|
if content_length < 100:
|
|
approaches.extend([
|
|
"Site may have anti-bot protection - try with different user agents",
|
|
"Check if the site requires authentication",
|
|
"Look for alternative URLs (www vs non-www, http vs https)",
|
|
"Try accessing the site's robots.txt for crawling guidelines"
|
|
])
|
|
|
|
# Add general suggestions
|
|
approaches.extend([
|
|
"Use the web.search action to find alternative sources",
|
|
"Try the web.scrape action with specific CSS selectors",
|
|
"Check if the site has a public API or data export"
|
|
])
|
|
|
|
return approaches
|
|
|
|
@action
|
|
async def search(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Perform a web search and output a .txt file with a plain list of URLs (one per line).
|
|
|
|
Parameters:
|
|
query (str): Search query to perform
|
|
maxResults (int, optional): Maximum number of results (default: 10)
|
|
filter (str, optional): Filter criteria for search results
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
query = parameters.get("query")
|
|
max_results = parameters.get("maxResults", 10)
|
|
filter_param = parameters.get("filter")
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not query:
|
|
return ActionResult.failure(error="Search query is required")
|
|
|
|
if not self.srcApikey:
|
|
return ActionResult.failure(error="SerpAPI key not configured")
|
|
|
|
userLanguage = "en"
|
|
if hasattr(self.service, 'user') and hasattr(self.service.user, 'language'):
|
|
userLanguage = self.service.user.language
|
|
|
|
params = {
|
|
"engine": self.srcEngine,
|
|
"q": query,
|
|
"api_key": self.srcApikey,
|
|
"num": min(max_results, self.maxResults),
|
|
"hl": userLanguage
|
|
}
|
|
|
|
if filter_param:
|
|
params["filter"] = filter_param
|
|
|
|
response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
search_results = response.json()
|
|
results = []
|
|
|
|
if "organic_results" in search_results:
|
|
results = search_results["organic_results"][:max_results]
|
|
|
|
# Assume 'results' is a list of dicts with 'url' keys
|
|
urls = [item['url'] for item in results if 'url' in item and isinstance(item['url'], str)]
|
|
url_list_str = "\n".join(urls)
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = ".txt" # Default
|
|
output_mime_type = "text/plain" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", ".txt")
|
|
output_mime_type = expected_format.get("mimeType", "text/plain")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info("No expected format specified, using default .txt format")
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"query": query,
|
|
"maxResults": max_results,
|
|
"filter": filter_param,
|
|
"totalResults": len(urls),
|
|
"urls": urls,
|
|
"urlList": url_list_str,
|
|
"timestamp": get_utc_timestamp()
|
|
}
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
documents=[
|
|
{
|
|
"documentName": f"web_search_{get_utc_timestamp()}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching web: {str(e)}")
|
|
return ActionResult(
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|
|
def _selenium_extract_content(self, url: str) -> Optional[str]:
|
|
"""Use Selenium to fetch and extract main content from a JS-heavy page."""
|
|
options = Options()
|
|
options.headless = True
|
|
options.add_argument('--no-sandbox')
|
|
options.add_argument('--disable-dev-shm-usage')
|
|
options.add_argument(f'user-agent={self.user_agent}')
|
|
try:
|
|
driver = webdriver.Chrome(options=options)
|
|
driver.set_page_load_timeout(self.timeout)
|
|
driver.get(url)
|
|
# Wait for body to load
|
|
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.TAG_NAME, "body")))
|
|
html = driver.page_source
|
|
driver.quit()
|
|
soup = BeautifulSoup(html, 'html.parser')
|
|
return self._extractMainContent(soup)
|
|
except WebDriverException as e:
|
|
logger.warning(f"Selenium failed for {url}: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Selenium error for {url}: {str(e)}")
|
|
return None
|
|
|
|
@action
|
|
async def crawl(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Crawl a list of URLs provided in a document (.txt) with URLs separated by newline, comma, or semicolon.
|
|
|
|
Parameters:
|
|
document (str): Document containing URL list
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
document = parameters.get("document")
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not document:
|
|
return ActionResult.failure(error="No document with URL list provided.")
|
|
|
|
# Read the document content
|
|
with open(document, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
# Split URLs by newline, comma, or semicolon
|
|
import re
|
|
urls = re.split(r'[\n,;]+', content)
|
|
urls = [u.strip() for u in urls if u.strip()]
|
|
|
|
if not urls:
|
|
return ActionResult.failure(error="No valid URLs provided in the document.")
|
|
|
|
crawl_results = []
|
|
for url in urls:
|
|
try:
|
|
logger.info(f"Crawling URL: {url}")
|
|
# Try Selenium first
|
|
content = self._selenium_extract_content(url)
|
|
if not content:
|
|
# Fallback to requests/BeautifulSoup
|
|
soup = self._readUrl(url)
|
|
content = self._extractMainContent(soup)
|
|
|
|
title = self._extractTitle(BeautifulSoup(content, 'html.parser'), url) if content else "No title"
|
|
meta_info = {"url": url, "title": title}
|
|
content_length = len(content) if content else 0
|
|
|
|
crawl_results.append({
|
|
"url": url,
|
|
"title": title,
|
|
"content": content,
|
|
"content_length": content_length,
|
|
"meta_info": meta_info,
|
|
"timestamp": get_utc_timestamp()
|
|
})
|
|
logger.info(f"Successfully crawled {url} - extracted {content_length} characters")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error crawling web page {url}: {str(e)}")
|
|
crawl_results.append({
|
|
"error": str(e),
|
|
"url": url,
|
|
"suggestions": [
|
|
"Check if the URL is accessible",
|
|
"Try with a different user agent",
|
|
"Verify the site doesn't block automated access"
|
|
]
|
|
})
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = ".json" # Default
|
|
output_mime_type = "application/json" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", ".json")
|
|
output_mime_type = expected_format.get("mimeType", "application/json")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info("No expected format specified, using default .json format")
|
|
|
|
result_data = {
|
|
"urls": urls,
|
|
"maxDepth": 1, # Simplified crawl
|
|
"includeImages": False,
|
|
"followLinks": True,
|
|
"crawlResults": crawl_results,
|
|
"summary": {
|
|
"total_urls": len(urls),
|
|
"successful_crawls": len([r for r in crawl_results if "error" not in r]),
|
|
"failed_crawls": len([r for r in crawl_results if "error" in r]),
|
|
"total_content_chars": sum([r.get("content_length", 0) for r in crawl_results if "content_length" in r])
|
|
},
|
|
"timestamp": get_utc_timestamp()
|
|
}
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
documents=[
|
|
{
|
|
"documentName": f"web_crawl_{int(get_utc_timestamp())}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error crawling web pages: {str(e)}")
|
|
return ActionResult(
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|
|
@action
|
|
async def scrape(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Scrape specific data from web pages
|
|
|
|
Parameters:
|
|
url (str): URL to scrape
|
|
selectors (Dict[str, str]): CSS selectors for data extraction
|
|
format (str, optional): Output format (default: "json")
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
url = parameters.get("url")
|
|
selectors = parameters.get("selectors")
|
|
format = parameters.get("format", "json")
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not url or not selectors:
|
|
return ActionResult(
|
|
success=False,
|
|
error="URL and selectors are required"
|
|
)
|
|
|
|
# Read the URL
|
|
soup = self._readUrl(url)
|
|
if not soup:
|
|
return ActionResult(
|
|
success=False,
|
|
error="Failed to read URL"
|
|
)
|
|
|
|
extracted_content = {}
|
|
|
|
if selectors:
|
|
# Extract content using provided selectors
|
|
for selector_name, selector in selectors.items():
|
|
elements = soup.select(selector)
|
|
if elements:
|
|
if format == "text":
|
|
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
|
|
elif format == "html":
|
|
extracted_content[selector_name] = [str(elem) for elem in elements]
|
|
else:
|
|
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
|
|
else:
|
|
extracted_content[selector_name] = []
|
|
else:
|
|
# Auto-extract common elements
|
|
extracted_content = {
|
|
"title": self._extractTitle(soup, url),
|
|
"main_content": self._extractMainContent(soup),
|
|
"headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])],
|
|
"links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))],
|
|
"images": [img.get('src') for img in soup.find_all('img', src=True)]
|
|
}
|
|
|
|
scrape_result = {
|
|
"url": url,
|
|
"selectors": selectors,
|
|
"format": format,
|
|
"content": extracted_content,
|
|
"timestamp": get_utc_timestamp()
|
|
}
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"url": url,
|
|
"selectors": selectors,
|
|
"format": format,
|
|
"scrapedData": scrape_result,
|
|
"timestamp": get_utc_timestamp()
|
|
}
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = f".{format}" # Default to format parameter
|
|
output_mime_type = "application/json" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", f".{format}")
|
|
output_mime_type = expected_format.get("mimeType", "application/json")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info(f"No expected format specified, using format parameter: {format}")
|
|
|
|
return ActionResult(
|
|
success=True,
|
|
documents=[
|
|
{
|
|
"documentName": f"web_scrape_{int(get_utc_timestamp())}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error scraping web page: {str(e)}")
|
|
return ActionResult(
|
|
success=False,
|
|
error=str(e)
|
|
)
|
|
|