1285 lines
54 KiB
Python
1285 lines
54 KiB
Python
"""
|
|
Web method module.
|
|
Handles web operations using the web service.
|
|
"""
|
|
|
|
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, UTC
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
import time
|
|
import uuid
|
|
import json # Added for JSON parsing
|
|
|
|
from modules.chat.methodBase import MethodBase, ActionResult, action
|
|
from modules.shared.configuration import APP_CONFIG
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MethodWeb(MethodBase):
|
|
"""Web method implementation for web operations"""
|
|
|
|
def __init__(self, serviceCenter: Any):
|
|
"""Initialize the web method"""
|
|
super().__init__(serviceCenter)
|
|
self.name = "web"
|
|
self.description = "Handle web operations like crawling and scraping"
|
|
|
|
# Web search configuration from agentWebcrawler
|
|
self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY", "")
|
|
self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE", "google")
|
|
self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY", "auto")
|
|
self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5"))
|
|
|
|
if not self.srcApikey:
|
|
logger.warning("SerpAPI key not configured for web search")
|
|
|
|
self.user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
|
self.timeout = 30
|
|
|
|
def _readUrl(self, url: str) -> BeautifulSoup:
|
|
"""Read a URL and return a BeautifulSoup parser for the content with enhanced error handling"""
|
|
if not url or not url.startswith(('http://', 'https://')):
|
|
logger.error(f"Invalid URL: {url}")
|
|
return None
|
|
|
|
# Enhanced headers to mimic real browser
|
|
headers = {
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.9,de;q=0.8',
|
|
'Accept-Encoding': 'gzip, deflate, br',
|
|
'DNT': '1',
|
|
'Connection': 'keep-alive',
|
|
'Upgrade-Insecure-Requests': '1',
|
|
'Sec-Fetch-Dest': 'document',
|
|
'Sec-Fetch-Mode': 'navigate',
|
|
'Sec-Fetch-Site': 'none',
|
|
'Cache-Control': 'max-age=0'
|
|
}
|
|
|
|
try:
|
|
# Use session for better connection handling
|
|
session = requests.Session()
|
|
session.headers.update(headers)
|
|
|
|
# Initial request with allow_redirects
|
|
response = session.get(url, timeout=self.timeout, allow_redirects=True)
|
|
|
|
# Handle various status codes
|
|
if response.status_code == 200:
|
|
# Success - parse content
|
|
logger.debug(f"Successfully read URL: {url}")
|
|
return BeautifulSoup(response.text, 'html.parser')
|
|
|
|
elif response.status_code == 202:
|
|
# Accepted - retry with backoff
|
|
logger.info(f"Status 202 for {url}, retrying with backoff...")
|
|
backoff_times = [1.0, 2.0, 5.0, 10.0]
|
|
|
|
for wait_time in backoff_times:
|
|
time.sleep(wait_time)
|
|
retry_response = session.get(url, timeout=self.timeout, allow_redirects=True)
|
|
|
|
if retry_response.status_code == 200:
|
|
logger.debug(f"Successfully read URL after retry: {url}")
|
|
return BeautifulSoup(retry_response.text, 'html.parser')
|
|
elif retry_response.status_code != 202:
|
|
break
|
|
|
|
logger.warning(f"Failed to read URL after retries: {url}")
|
|
return None
|
|
|
|
elif response.status_code in [301, 302, 307, 308]:
|
|
# Redirect - should be handled by allow_redirects=True
|
|
logger.warning(f"Unexpected redirect status {response.status_code} for {url}")
|
|
return None
|
|
|
|
elif response.status_code == 403:
|
|
# Forbidden - try with different user agent
|
|
logger.warning(f"403 Forbidden for {url}, trying with different user agent...")
|
|
headers['User-Agent'] = 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
|
|
session.headers.update(headers)
|
|
|
|
retry_response = session.get(url, timeout=self.timeout, allow_redirects=True)
|
|
if retry_response.status_code == 200:
|
|
logger.debug(f"Successfully read URL with different user agent: {url}")
|
|
return BeautifulSoup(retry_response.text, 'html.parser')
|
|
else:
|
|
logger.error(f"Still getting {retry_response.status_code} for {url}")
|
|
return None
|
|
|
|
elif response.status_code == 429:
|
|
# Rate limited - wait and retry
|
|
logger.warning(f"Rate limited for {url}, waiting 30 seconds...")
|
|
time.sleep(30)
|
|
retry_response = session.get(url, timeout=self.timeout, allow_redirects=True)
|
|
if retry_response.status_code == 200:
|
|
logger.debug(f"Successfully read URL after rate limit: {url}")
|
|
return BeautifulSoup(retry_response.text, 'html.parser')
|
|
else:
|
|
logger.error(f"Still getting {retry_response.status_code} after rate limit wait for {url}")
|
|
return None
|
|
|
|
else:
|
|
# Other error status codes
|
|
logger.error(f"HTTP {response.status_code} for {url}")
|
|
return None
|
|
|
|
except requests.exceptions.Timeout:
|
|
logger.error(f"Timeout reading URL: {url}")
|
|
return None
|
|
except requests.exceptions.ConnectionError:
|
|
logger.error(f"Connection error reading URL: {url}")
|
|
return None
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Request error reading URL {url}: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error reading URL {url}: {str(e)}")
|
|
return None
|
|
|
|
def _extractTitle(self, soup: BeautifulSoup, url: str) -> str:
|
|
"""Extract the title from a webpage"""
|
|
if not soup:
|
|
return f"Error with {url}"
|
|
|
|
# Extract title from title tag
|
|
title_tag = soup.find('title')
|
|
title = title_tag.text.strip() if title_tag else "No title"
|
|
|
|
# Alternative: Also look for h1 tags if title tag is missing
|
|
if title == "No title":
|
|
h1_tag = soup.find('h1')
|
|
if h1_tag:
|
|
title = h1_tag.text.strip()
|
|
|
|
return title
|
|
|
|
def _extractMainContent(self, soup: BeautifulSoup, max_chars: int = 50000) -> str:
|
|
"""Extract the main content from an HTML page with enhanced content detection"""
|
|
if not soup:
|
|
return ""
|
|
|
|
# Try to find main content elements in priority order with more selectors
|
|
main_content = None
|
|
content_selectors = [
|
|
'main',
|
|
'article',
|
|
'#content',
|
|
'.content',
|
|
'#main',
|
|
'.main',
|
|
'.post-content',
|
|
'.entry-content',
|
|
'.article-content',
|
|
'.page-content',
|
|
'[role="main"]',
|
|
'.container',
|
|
'.wrapper'
|
|
]
|
|
|
|
for selector in content_selectors:
|
|
content = soup.select_one(selector)
|
|
if content:
|
|
main_content = content
|
|
logger.debug(f"Found main content using selector: {selector}")
|
|
break
|
|
|
|
# If no main content found, use the body
|
|
if not main_content:
|
|
main_content = soup.find('body') or soup
|
|
logger.debug("Using body as main content")
|
|
|
|
# Create a copy to avoid modifying the original
|
|
content_copy = main_content.copy()
|
|
|
|
# Remove elements that don't contribute to main content (less aggressive)
|
|
elements_to_remove = [
|
|
'script', 'style', 'noscript',
|
|
'nav', 'footer', 'header', 'aside',
|
|
'.sidebar', '#sidebar', '.comments', '#comments',
|
|
'.advertisement', '.ads', '.ad', '.banner',
|
|
'iframe', '.social-share', '.share-buttons',
|
|
'.breadcrumb', '.breadcrumbs', '.pagination',
|
|
'.related-posts', '.related-articles',
|
|
'.newsletter', '.subscribe', '.signup',
|
|
'.cookie-notice', '.privacy-notice',
|
|
'.popup', '.modal', '.overlay'
|
|
]
|
|
|
|
for selector in elements_to_remove:
|
|
for element in content_copy.select(selector):
|
|
element.extract()
|
|
|
|
# Extract text content with better formatting
|
|
text_content = content_copy.get_text(separator='\n', strip=True)
|
|
|
|
# Clean up the text
|
|
lines = text_content.split('\n')
|
|
cleaned_lines = []
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line and len(line) > 10: # Only keep meaningful lines
|
|
cleaned_lines.append(line)
|
|
|
|
# Join lines with proper spacing
|
|
cleaned_content = '\n\n'.join(cleaned_lines)
|
|
|
|
# If content is too short, try alternative extraction
|
|
if len(cleaned_content) < 500:
|
|
logger.debug("Content too short, trying alternative extraction...")
|
|
|
|
# Try to extract from all paragraphs
|
|
paragraphs = soup.find_all(['p', 'div', 'section'])
|
|
alt_content = []
|
|
|
|
for p in paragraphs:
|
|
text = p.get_text(strip=True)
|
|
if text and len(text) > 20: # Only meaningful paragraphs
|
|
alt_content.append(text)
|
|
|
|
if alt_content:
|
|
cleaned_content = '\n\n'.join(alt_content[:20]) # Limit to first 20 paragraphs
|
|
|
|
# Limit to max_chars but preserve complete sentences
|
|
if len(cleaned_content) > max_chars:
|
|
# Try to cut at a sentence boundary
|
|
sentences = cleaned_content.split('. ')
|
|
truncated_content = ""
|
|
|
|
for sentence in sentences:
|
|
if len(truncated_content + sentence) < max_chars:
|
|
truncated_content += sentence + ". "
|
|
else:
|
|
break
|
|
|
|
cleaned_content = truncated_content.strip()
|
|
|
|
logger.debug(f"Extracted {len(cleaned_content)} characters of content")
|
|
return cleaned_content
|
|
|
|
def _checkAccessibility(self, soup: BeautifulSoup) -> Dict[str, Any]:
|
|
"""Check basic accessibility features"""
|
|
issues = []
|
|
warnings = []
|
|
|
|
# Check for alt text on images
|
|
images_without_alt = soup.find_all('img', alt='')
|
|
if images_without_alt:
|
|
issues.append(f"Found {len(images_without_alt)} images without alt text")
|
|
|
|
# Check for proper heading structure
|
|
headings = soup.find_all(['h1', 'h2', 'h3', 'h4', 'h5', 'h6'])
|
|
if not headings:
|
|
warnings.append("No headings found - poor document structure")
|
|
|
|
# Check for form labels
|
|
forms = soup.find_all('form')
|
|
for form in forms:
|
|
inputs = form.find_all('input')
|
|
for input_elem in inputs:
|
|
if input_elem.get('type') not in ['submit', 'button', 'hidden']:
|
|
if not input_elem.get('id') or not soup.find('label', attrs={'for': input_elem.get('id')}):
|
|
warnings.append("Form input without proper label")
|
|
|
|
return {
|
|
"status": "warning" if warnings else "pass",
|
|
"issues": issues,
|
|
"warnings": warnings
|
|
}
|
|
|
|
def _checkSEO(self, soup: BeautifulSoup) -> Dict[str, Any]:
|
|
"""Check basic SEO features"""
|
|
issues = []
|
|
warnings = []
|
|
|
|
# Check for title tag
|
|
title = soup.find('title')
|
|
if not title:
|
|
issues.append("Missing title tag")
|
|
elif len(title.get_text()) < 10:
|
|
warnings.append("Title tag is too short")
|
|
elif len(title.get_text()) > 60:
|
|
warnings.append("Title tag is too long")
|
|
|
|
# Check for meta description
|
|
meta_desc = soup.find('meta', attrs={'name': 'description'})
|
|
if not meta_desc:
|
|
warnings.append("Missing meta description")
|
|
elif meta_desc.get('content'):
|
|
if len(meta_desc.get('content')) < 50:
|
|
warnings.append("Meta description is too short")
|
|
elif len(meta_desc.get('content')) > 160:
|
|
warnings.append("Meta description is too long")
|
|
|
|
# Check for h1 tag
|
|
h1_tags = soup.find_all('h1')
|
|
if not h1_tags:
|
|
warnings.append("No H1 tag found")
|
|
elif len(h1_tags) > 1:
|
|
warnings.append("Multiple H1 tags found")
|
|
|
|
return {
|
|
"status": "warning" if warnings else "pass",
|
|
"issues": issues,
|
|
"warnings": warnings
|
|
}
|
|
|
|
def _checkPerformance(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
|
|
"""Check basic performance indicators"""
|
|
warnings = []
|
|
|
|
# Count images
|
|
images = soup.find_all('img')
|
|
if len(images) > 20:
|
|
warnings.append(f"Many images found ({len(images)}) - may impact loading speed")
|
|
|
|
# Check for external resources
|
|
external_scripts = soup.find_all('script', src=True)
|
|
external_styles = soup.find_all('link', rel='stylesheet')
|
|
|
|
if len(external_scripts) > 10:
|
|
warnings.append(f"Many external scripts ({len(external_scripts)}) - may impact loading speed")
|
|
|
|
if len(external_styles) > 5:
|
|
warnings.append(f"Many external stylesheets ({len(external_styles)}) - may impact loading speed")
|
|
|
|
return {
|
|
"status": "warning" if warnings else "pass",
|
|
"warnings": warnings,
|
|
"metrics": {
|
|
"images": len(images),
|
|
"external_scripts": len(external_scripts),
|
|
"external_styles": len(external_styles)
|
|
}
|
|
}
|
|
|
|
def _detectJavaScriptRendering(self, soup: BeautifulSoup) -> bool:
|
|
"""Detect if a page likely requires JavaScript rendering"""
|
|
if not soup:
|
|
return False
|
|
|
|
# Check for common indicators of JavaScript-rendered content
|
|
indicators = [
|
|
# Angular, React, Vue indicators
|
|
soup.find('div', {'ng-app': True}),
|
|
soup.find('div', {'id': 'root'}),
|
|
soup.find('div', {'id': 'app'}),
|
|
soup.find('div', {'id': 'react-root'}),
|
|
|
|
# SPA indicators
|
|
soup.find('div', {'id': 'spa-root'}),
|
|
soup.find('div', {'class': 'spa-container'}),
|
|
|
|
# Modern framework indicators
|
|
soup.find('div', {'data-reactroot': True}),
|
|
soup.find('div', {'data-ng-controller': True}),
|
|
|
|
# Empty content with scripts
|
|
len(soup.get_text(strip=True)) < 100 and len(soup.find_all('script')) > 2
|
|
]
|
|
|
|
return any(indicators)
|
|
|
|
def _extractMetaInformation(self, soup: BeautifulSoup, url: str) -> Dict[str, Any]:
|
|
"""Extract meta information from the page"""
|
|
meta_info = {
|
|
"url": url,
|
|
"title": self._extractTitle(soup, url),
|
|
"description": "",
|
|
"keywords": "",
|
|
"author": "",
|
|
"language": "",
|
|
"robots": "",
|
|
"viewport": "",
|
|
"charset": "",
|
|
"canonical": ""
|
|
}
|
|
|
|
# Extract meta tags
|
|
meta_tags = soup.find_all('meta')
|
|
for meta in meta_tags:
|
|
name = meta.get('name', '').lower()
|
|
property = meta.get('property', '').lower()
|
|
content = meta.get('content', '')
|
|
|
|
if name == 'description' or property == 'og:description':
|
|
meta_info['description'] = content
|
|
elif name == 'keywords':
|
|
meta_info['keywords'] = content
|
|
elif name == 'author':
|
|
meta_info['author'] = content
|
|
elif name == 'language':
|
|
meta_info['language'] = content
|
|
elif name == 'robots':
|
|
meta_info['robots'] = content
|
|
elif name == 'viewport':
|
|
meta_info['viewport'] = content
|
|
elif property == 'og:title':
|
|
meta_info['title'] = content
|
|
elif property == 'og:url':
|
|
meta_info['canonical'] = content
|
|
|
|
# Extract charset
|
|
charset_meta = soup.find('meta', charset=True)
|
|
if charset_meta:
|
|
meta_info['charset'] = charset_meta.get('charset', '')
|
|
|
|
# Extract canonical URL
|
|
canonical_link = soup.find('link', rel='canonical')
|
|
if canonical_link:
|
|
meta_info['canonical'] = canonical_link.get('href', '')
|
|
|
|
return meta_info
|
|
|
|
def _getAlternativeApproaches(self, url: str, requires_js: bool, content_length: int) -> List[str]:
|
|
"""Get alternative approaches for sites that are difficult to crawl"""
|
|
approaches = []
|
|
|
|
if requires_js:
|
|
approaches.extend([
|
|
"Site requires JavaScript rendering - consider using a headless browser",
|
|
"Try accessing the site's API endpoints directly",
|
|
"Look for RSS feeds or sitemaps",
|
|
"Check if the site has a mobile version that's easier to parse"
|
|
])
|
|
|
|
if content_length < 100:
|
|
approaches.extend([
|
|
"Site may have anti-bot protection - try with different user agents",
|
|
"Check if the site requires authentication",
|
|
"Look for alternative URLs (www vs non-www, http vs https)",
|
|
"Try accessing the site's robots.txt for crawling guidelines"
|
|
])
|
|
|
|
# Add general suggestions
|
|
approaches.extend([
|
|
"Use the web.search action to find alternative sources",
|
|
"Try the web.scrape action with specific CSS selectors",
|
|
"Check if the site has a public API or data export"
|
|
])
|
|
|
|
return approaches
|
|
|
|
async def _tryAdvancedAIWebResearch(self, action_type: str, parameters: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Try to get web research results using advanced AI first
|
|
|
|
Args:
|
|
action_type: Type of action ('crawl', 'scrape', or 'search')
|
|
parameters: Action parameters
|
|
|
|
Returns:
|
|
Dict with AI results if successful, None if AI call fails
|
|
"""
|
|
try:
|
|
# Create appropriate prompt based on action type
|
|
if action_type == "crawl":
|
|
prompt = self._createCrawlAIPrompt(parameters)
|
|
elif action_type == "scrape":
|
|
prompt = self._createScrapeAIPrompt(parameters)
|
|
elif action_type == "search":
|
|
prompt = self._createSearchAIPrompt(parameters)
|
|
else:
|
|
logger.warning(f"Unknown action type for AI research: {action_type}")
|
|
return None
|
|
|
|
# Try advanced AI call
|
|
if hasattr(self.service, 'callAiTextAdvanced'):
|
|
logger.info(f"Attempting advanced AI web research for {action_type}")
|
|
response = await self.service.callAiTextAdvanced(prompt)
|
|
|
|
# Parse the AI response
|
|
parsed_result = self._parseAIWebResponse(response, action_type)
|
|
if parsed_result:
|
|
logger.info(f"Advanced AI web research successful for {action_type}")
|
|
return parsed_result
|
|
else:
|
|
logger.warning(f"Failed to parse AI response for {action_type}")
|
|
return None
|
|
else:
|
|
logger.warning("Service does not have callAiTextAdvanced method")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Advanced AI web research failed for {action_type}: {str(e)}")
|
|
return None
|
|
|
|
def _createCrawlAIPrompt(self, parameters: Dict[str, Any]) -> str:
|
|
"""Create AI prompt for web crawling"""
|
|
urls = parameters.get("urls", [])
|
|
maxDepth = parameters.get("maxDepth", 2)
|
|
includeImages = parameters.get("includeImages", False)
|
|
followLinks = parameters.get("followLinks", True)
|
|
|
|
prompt = f"""
|
|
You are an advanced AI research assistant with comprehensive knowledge about websites, companies, and online content. Please provide detailed information about the following URLs based on your extensive training data and knowledge.
|
|
|
|
URLs to research: {urls}
|
|
Max depth: {maxDepth}
|
|
Include images: {includeImages}
|
|
Follow links: {followLinks}
|
|
|
|
For each URL, please provide comprehensive information including:
|
|
1. Company/organization information and background
|
|
2. Main business activities and services
|
|
3. Key personnel and leadership
|
|
4. Contact information and locations
|
|
5. Recent news and developments
|
|
6. Industry analysis and market position
|
|
7. Related companies and partnerships
|
|
8. Website structure and key pages
|
|
9. Business model and revenue streams
|
|
10. Regulatory compliance and certifications
|
|
|
|
For each URL, provide:
|
|
- url: The original URL
|
|
- title: Company/organization name
|
|
- content: Comprehensive description and analysis
|
|
- content_length: Number of characters in content
|
|
- meta_info: Business information object
|
|
- links: Related companies and important connections
|
|
- images: Company logos or key visuals if known
|
|
- requires_javascript: Boolean (usually false for static info)
|
|
- alternative_approaches: Additional research suggestions
|
|
- timestamp: Current timestamp
|
|
|
|
Return the results in this exact JSON format:
|
|
{{
|
|
"urls": {urls},
|
|
"maxDepth": {maxDepth},
|
|
"includeImages": {includeImages},
|
|
"followLinks": {followLinks},
|
|
"crawlResults": [
|
|
{{
|
|
"url": "url_here",
|
|
"depth": {maxDepth},
|
|
"followLinks": {followLinks},
|
|
"extractContent": true,
|
|
"title": "company_name",
|
|
"content": "comprehensive_company_analysis",
|
|
"content_length": 1234,
|
|
"meta_info": {{
|
|
"url": "url_here",
|
|
"title": "company_name",
|
|
"description": "business_description",
|
|
"keywords": "industry_keywords",
|
|
"author": "company_info",
|
|
"language": "language_code",
|
|
"robots": "robots_info",
|
|
"viewport": "viewport_info",
|
|
"charset": "charset_info",
|
|
"canonical": "canonical_url"
|
|
}},
|
|
"links": [
|
|
{{
|
|
"url": "related_company_url",
|
|
"text": "company_name"
|
|
}}
|
|
],
|
|
"images": [
|
|
{{
|
|
"src": "logo_url",
|
|
"alt": "company_logo",
|
|
"title": "company_name",
|
|
"width": "width_value",
|
|
"height": "height_value"
|
|
}}
|
|
],
|
|
"requires_javascript": false,
|
|
"alternative_approaches": ["approach1", "approach2"],
|
|
"timestamp": "2024-01-01T00:00:00Z"
|
|
}}
|
|
],
|
|
"summary": {{
|
|
"total_urls": {len(urls)},
|
|
"successful_crawls": 0,
|
|
"failed_crawls": 0,
|
|
"total_content_chars": 0
|
|
}},
|
|
"timestamp": "2024-01-01T00:00:00Z"
|
|
}}
|
|
|
|
Please provide accurate, comprehensive information about each company/organization based on your knowledge. If you don't have specific information about a URL, provide general industry analysis and suggest alternative research approaches.
|
|
"""
|
|
return prompt
|
|
|
|
def _createScrapeAIPrompt(self, parameters: Dict[str, Any]) -> str:
|
|
"""Create AI prompt for web scraping"""
|
|
url = parameters.get("url")
|
|
selectors = parameters.get("selectors", {})
|
|
format = parameters.get("format", "json")
|
|
|
|
prompt = f"""
|
|
You are an advanced AI research assistant with comprehensive knowledge about websites, companies, and online content. Please provide detailed information about the following URL and the specific data requested based on your extensive training data and knowledge.
|
|
|
|
URL to research: {url}
|
|
Data selectors: {selectors}
|
|
Output format: {format}
|
|
|
|
Please provide comprehensive information including:
|
|
1. Company/organization background and history
|
|
2. Business activities and services offered
|
|
3. Key personnel and leadership information
|
|
4. Financial information and performance data
|
|
5. Market position and competitive analysis
|
|
6. Recent news and developments
|
|
7. Contact information and locations
|
|
8. Industry trends and insights
|
|
9. Related companies and partnerships
|
|
10. Regulatory and compliance information
|
|
|
|
For each data selector requested, provide relevant information in the specified format (text, html, or json).
|
|
|
|
Return the results in this exact JSON format:
|
|
{{
|
|
"url": "{url}",
|
|
"selectors": {selectors},
|
|
"format": "{format}",
|
|
"scrapedData": {{
|
|
"url": "{url}",
|
|
"selectors": {selectors},
|
|
"format": "{format}",
|
|
"content": {{
|
|
"company_info": ["comprehensive_company_analysis"],
|
|
"business_activities": ["detailed_business_description"],
|
|
"leadership": ["key_personnel_information"],
|
|
"financial_data": ["financial_performance_analysis"],
|
|
"market_position": ["competitive_analysis"],
|
|
"recent_news": ["latest_developments"],
|
|
"contact_info": ["contact_details"],
|
|
"industry_insights": ["market_trends"],
|
|
"partnerships": ["related_companies"],
|
|
"compliance": ["regulatory_information"]
|
|
}},
|
|
"timestamp": "2024-01-01T00:00:00Z"
|
|
}},
|
|
"timestamp": "2024-01-01T00:00:00Z"
|
|
}}
|
|
|
|
Please provide accurate, comprehensive information about the company/organization based on your knowledge. If you don't have specific information about the URL, provide general industry analysis and suggest alternative research approaches.
|
|
"""
|
|
return prompt
|
|
|
|
def _createSearchAIPrompt(self, parameters: Dict[str, Any]) -> str:
|
|
"""Create AI prompt for web search"""
|
|
query = parameters.get("query")
|
|
engine = parameters.get("engine", "google")
|
|
maxResults = parameters.get("maxResults", 10)
|
|
filter = parameters.get("filter")
|
|
|
|
prompt = f"""
|
|
You are an advanced AI research assistant with comprehensive knowledge about companies, industries, and business information. Please provide detailed information about the following search query based on your extensive training data and knowledge.
|
|
|
|
Search query: {query}
|
|
Search engine: {engine}
|
|
Max results: {maxResults}
|
|
Filter: {filter}
|
|
|
|
Please provide comprehensive research results including:
|
|
1. Relevant company/organization information
|
|
2. Industry analysis and market insights
|
|
3. Key personnel and leadership details
|
|
4. Business activities and services
|
|
5. Financial performance and metrics
|
|
6. Recent news and developments
|
|
7. Competitive landscape analysis
|
|
8. Market trends and opportunities
|
|
9. Regulatory and compliance information
|
|
10. Related companies and partnerships
|
|
|
|
For each search result, provide:
|
|
- title: Company/organization name
|
|
- url: Official website or primary source
|
|
- snippet: Brief description and key highlights
|
|
- content: Comprehensive analysis and insights
|
|
|
|
Return the results in this exact JSON format:
|
|
{{
|
|
"query": "{query}",
|
|
"engine": "{engine}",
|
|
"maxResults": {maxResults},
|
|
"filter": "{filter}",
|
|
"searchResults": {{
|
|
"query": "{query}",
|
|
"maxResults": {maxResults},
|
|
"results": [
|
|
{{
|
|
"title": "company_name",
|
|
"url": "official_website",
|
|
"snippet": "brief_description",
|
|
"content": "comprehensive_analysis"
|
|
}}
|
|
],
|
|
"totalFound": 0,
|
|
"timestamp": "2024-01-01T00:00:00Z"
|
|
}},
|
|
"timestamp": "2024-01-01T00:00:00Z"
|
|
}}
|
|
|
|
Please provide accurate, comprehensive information about the search query based on your knowledge. If you don't have specific information about the query, provide general industry analysis and suggest alternative research approaches.
|
|
"""
|
|
return prompt
|
|
|
|
def _parseAIWebResponse(self, response: str, action_type: str) -> Optional[Dict[str, Any]]:
|
|
"""Parse AI response into structured data"""
|
|
try:
|
|
# Extract JSON from response
|
|
json_start = response.find('{')
|
|
json_end = response.rfind('}') + 1
|
|
if json_start == -1 or json_end == 0:
|
|
logger.warning(f"No JSON found in AI response: {response}")
|
|
return None
|
|
|
|
json_str = response[json_start:json_end]
|
|
parsed_data = json.loads(json_str)
|
|
|
|
# Validate basic structure based on action type
|
|
if action_type == "crawl":
|
|
if "crawlResults" not in parsed_data:
|
|
logger.warning("Invalid crawl response structure")
|
|
return None
|
|
elif action_type == "scrape":
|
|
if "scrapedData" not in parsed_data:
|
|
logger.warning("Invalid scrape response structure")
|
|
return None
|
|
elif action_type == "search":
|
|
if "searchResults" not in parsed_data:
|
|
logger.warning("Invalid search response structure")
|
|
return None
|
|
|
|
return parsed_data
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Failed to parse AI response JSON: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing AI response: {str(e)}")
|
|
return None
|
|
|
|
@action
|
|
async def crawl(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Crawl web pages and extract content with enhanced error handling and content detection
|
|
|
|
Parameters:
|
|
urls (List[str]): List of URLs to crawl
|
|
maxDepth (int, optional): Maximum crawl depth (default: 2)
|
|
includeImages (bool, optional): Whether to include images (default: False)
|
|
followLinks (bool, optional): Whether to follow links (default: True)
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
urls = parameters.get("urls")
|
|
maxDepth = parameters.get("maxDepth", 2)
|
|
includeImages = parameters.get("includeImages", False)
|
|
followLinks = parameters.get("followLinks", True)
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not urls:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="URLs are required"
|
|
)
|
|
|
|
# Try advanced AI research first
|
|
ai_result = await self._tryAdvancedAIWebResearch("crawl", parameters)
|
|
if ai_result:
|
|
logger.info("Using advanced AI web research for crawl")
|
|
# Reconstruct the result data from the AI response
|
|
result_data = {
|
|
"urls": ai_result.get("urls", []),
|
|
"maxDepth": ai_result.get("maxDepth", 2),
|
|
"includeImages": ai_result.get("includeImages", False),
|
|
"followLinks": ai_result.get("followLinks", True),
|
|
"crawlResults": ai_result.get("crawlResults", []),
|
|
"summary": ai_result.get("summary", {}),
|
|
"timestamp": ai_result.get("timestamp", datetime.now(UTC).isoformat())
|
|
}
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_crawl_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
|
|
"documentData": result_data,
|
|
"mimeType": "application/json"
|
|
}
|
|
]
|
|
}
|
|
)
|
|
else:
|
|
logger.info("Advanced AI web research failed, falling back to regular web crawling")
|
|
|
|
# Crawl each URL
|
|
crawl_results = []
|
|
|
|
for url in urls:
|
|
try:
|
|
logger.info(f"Crawling URL: {url}")
|
|
|
|
# Read the URL with enhanced error handling
|
|
soup = self._readUrl(url)
|
|
if not soup:
|
|
logger.error(f"Failed to read URL: {url}")
|
|
crawl_results.append({
|
|
"error": "Failed to read URL - check if the site is accessible and not blocking crawlers",
|
|
"url": url,
|
|
"suggestions": [
|
|
"Try accessing the URL directly in a browser",
|
|
"Check if the site requires JavaScript",
|
|
"Verify the URL is correct and accessible"
|
|
]
|
|
})
|
|
continue
|
|
|
|
# Extract comprehensive information
|
|
title = self._extractTitle(soup, url)
|
|
content = self._extractMainContent(soup)
|
|
meta_info = self._extractMetaInformation(soup, url)
|
|
|
|
# Check if content is meaningful
|
|
content_length = len(content)
|
|
if content_length < 100:
|
|
logger.warning(f"Very little content extracted from {url} ({content_length} chars)")
|
|
crawl_results.append({
|
|
"url": url,
|
|
"title": title,
|
|
"content": content,
|
|
"content_length": content_length,
|
|
"warning": "Very little content extracted - site may require JavaScript or have anti-bot protection",
|
|
"meta_info": meta_info,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
})
|
|
continue
|
|
|
|
# Extract links if requested
|
|
links = []
|
|
if followLinks:
|
|
for link in soup.find_all('a', href=True):
|
|
href = link.get('href')
|
|
if href and href.startswith(('http://', 'https://')):
|
|
link_text = link.get_text(strip=True)
|
|
if link_text: # Only include links with text
|
|
links.append({
|
|
'url': href,
|
|
'text': link_text[:100]
|
|
})
|
|
|
|
# Extract images if requested
|
|
images = []
|
|
if includeImages:
|
|
for img in soup.find_all('img', src=True):
|
|
src = img.get('src')
|
|
if src:
|
|
images.append({
|
|
'src': src,
|
|
'alt': img.get('alt', ''),
|
|
'title': img.get('title', ''),
|
|
'width': img.get('width', ''),
|
|
'height': img.get('height', '')
|
|
})
|
|
|
|
# Check for JavaScript rendering requirements
|
|
requires_js = self._detectJavaScriptRendering(soup)
|
|
|
|
# Get alternative approaches if needed
|
|
alternative_approaches = self._getAlternativeApproaches(url, requires_js, content_length)
|
|
|
|
crawl_results.append({
|
|
"url": url,
|
|
"depth": maxDepth,
|
|
"followLinks": followLinks,
|
|
"extractContent": True,
|
|
"title": title,
|
|
"content": content,
|
|
"content_length": content_length,
|
|
"meta_info": meta_info,
|
|
"links": links[:20], # Limit to first 20 links
|
|
"images": images[:20], # Limit to first 20 images
|
|
"requires_javascript": requires_js,
|
|
"alternative_approaches": alternative_approaches,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
})
|
|
|
|
logger.info(f"Successfully crawled {url} - extracted {content_length} characters")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error crawling web page {url}: {str(e)}")
|
|
crawl_results.append({
|
|
"error": str(e),
|
|
"url": url,
|
|
"suggestions": [
|
|
"Check if the URL is accessible",
|
|
"Try with a different user agent",
|
|
"Verify the site doesn't block automated access"
|
|
]
|
|
})
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"urls": urls,
|
|
"maxDepth": maxDepth,
|
|
"includeImages": includeImages,
|
|
"followLinks": followLinks,
|
|
"crawlResults": crawl_results,
|
|
"summary": {
|
|
"total_urls": len(urls),
|
|
"successful_crawls": len([r for r in crawl_results if "error" not in r]),
|
|
"failed_crawls": len([r for r in crawl_results if "error" in r]),
|
|
"total_content_chars": sum([r.get("content_length", 0) for r in crawl_results if "content_length" in r])
|
|
},
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = ".json" # Default
|
|
output_mime_type = "application/json" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", ".json")
|
|
output_mime_type = expected_format.get("mimeType", "application/json")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info("No expected format specified, using default .json format")
|
|
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_crawl_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error crawling web pages: {str(e)}")
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error=str(e)
|
|
)
|
|
|
|
@action
|
|
async def scrape(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Scrape specific data from web pages
|
|
|
|
Parameters:
|
|
url (str): URL to scrape
|
|
selectors (Dict[str, str]): CSS selectors for data extraction
|
|
format (str, optional): Output format (default: "json")
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
url = parameters.get("url")
|
|
selectors = parameters.get("selectors")
|
|
format = parameters.get("format", "json")
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not url or not selectors:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="URL and selectors are required"
|
|
)
|
|
|
|
# Try advanced AI research first
|
|
ai_result = await self._tryAdvancedAIWebResearch("scrape", parameters)
|
|
if ai_result:
|
|
logger.info("Using advanced AI web research for scrape")
|
|
# Reconstruct the result data from the AI response
|
|
result_data = {
|
|
"url": ai_result.get("url"),
|
|
"selectors": ai_result.get("selectors"),
|
|
"format": ai_result.get("format"),
|
|
"scrapedData": ai_result.get("scrapedData"),
|
|
"timestamp": ai_result.get("timestamp", datetime.now(UTC).isoformat())
|
|
}
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_scrape_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
|
|
"documentData": result_data,
|
|
"mimeType": "application/json"
|
|
}
|
|
]
|
|
}
|
|
)
|
|
else:
|
|
logger.info("Advanced AI web research failed, falling back to regular web scraping")
|
|
|
|
# Read the URL
|
|
soup = self._readUrl(url)
|
|
if not soup:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="Failed to read URL"
|
|
)
|
|
|
|
extracted_content = {}
|
|
|
|
if selectors:
|
|
# Extract content using provided selectors
|
|
for selector_name, selector in selectors.items():
|
|
elements = soup.select(selector)
|
|
if elements:
|
|
if format == "text":
|
|
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
|
|
elif format == "html":
|
|
extracted_content[selector_name] = [str(elem) for elem in elements]
|
|
else:
|
|
extracted_content[selector_name] = [elem.get_text(strip=True) for elem in elements]
|
|
else:
|
|
extracted_content[selector_name] = []
|
|
else:
|
|
# Auto-extract common elements
|
|
extracted_content = {
|
|
"title": self._extractTitle(soup, url),
|
|
"main_content": self._extractMainContent(soup),
|
|
"headings": [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])],
|
|
"links": [a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith(('http://', 'https://'))],
|
|
"images": [img.get('src') for img in soup.find_all('img', src=True)]
|
|
}
|
|
|
|
scrape_result = {
|
|
"url": url,
|
|
"selectors": selectors,
|
|
"format": format,
|
|
"content": extracted_content,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"url": url,
|
|
"selectors": selectors,
|
|
"format": format,
|
|
"scrapedData": scrape_result,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = f".{format}" # Default to format parameter
|
|
output_mime_type = "application/json" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", f".{format}")
|
|
output_mime_type = expected_format.get("mimeType", "application/json")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info(f"No expected format specified, using format parameter: {format}")
|
|
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_scrape_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error scraping web page: {str(e)}")
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error=str(e)
|
|
)
|
|
|
|
@action
|
|
async def search(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
Search web content
|
|
|
|
Parameters:
|
|
query (str): Search query
|
|
engine (str, optional): Search engine to use (default: "google")
|
|
maxResults (int, optional): Maximum number of results (default: 10)
|
|
filter (str, optional): Additional search filters
|
|
expectedDocumentFormats (list, optional): Expected document formats with extension, mimeType, description
|
|
"""
|
|
try:
|
|
query = parameters.get("query")
|
|
engine = parameters.get("engine", "google")
|
|
maxResults = parameters.get("maxResults", 10)
|
|
filter = parameters.get("filter")
|
|
expectedDocumentFormats = parameters.get("expectedDocumentFormats", [])
|
|
|
|
if not query:
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error="Search query is required"
|
|
)
|
|
|
|
# Try advanced AI research first
|
|
ai_result = await self._tryAdvancedAIWebResearch("search", parameters)
|
|
if ai_result:
|
|
logger.info("Using advanced AI web research for search")
|
|
# Reconstruct the result data from the AI response
|
|
result_data = {
|
|
"query": ai_result.get("query"),
|
|
"engine": ai_result.get("engine"),
|
|
"maxResults": ai_result.get("maxResults"),
|
|
"filter": ai_result.get("filter"),
|
|
"searchResults": ai_result.get("searchResults"),
|
|
"timestamp": ai_result.get("timestamp", datetime.now(UTC).isoformat())
|
|
}
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_search_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}.json",
|
|
"documentData": result_data,
|
|
"mimeType": "application/json"
|
|
}
|
|
]
|
|
}
|
|
)
|
|
else:
|
|
logger.info("Advanced AI web research failed, falling back to regular web search")
|
|
|
|
# Search web content using Google search via SerpAPI
|
|
try:
|
|
if not self.srcApikey:
|
|
search_result = {
|
|
"error": "SerpAPI key not configured",
|
|
"query": query
|
|
}
|
|
else:
|
|
# Get user language from service center if available
|
|
userLanguage = "en" # Default language
|
|
if hasattr(self.service, 'user') and hasattr(self.service.user, 'language'):
|
|
userLanguage = self.service.user.language
|
|
|
|
# Format the search request for SerpAPI
|
|
params = {
|
|
"engine": self.srcEngine,
|
|
"q": query,
|
|
"api_key": self.srcApikey,
|
|
"num": min(maxResults, self.maxResults), # Number of results to return
|
|
"hl": userLanguage # User language
|
|
}
|
|
|
|
# Make the API request
|
|
response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
|
|
# Parse JSON response
|
|
search_results = response.json()
|
|
|
|
# Extract organic results
|
|
results = []
|
|
|
|
if "organic_results" in search_results:
|
|
for result in search_results["organic_results"][:maxResults]:
|
|
# Extract title
|
|
title = result.get("title", "No title")
|
|
|
|
# Extract URL
|
|
url = result.get("link", "No URL")
|
|
|
|
# Extract snippet
|
|
snippet = result.get("snippet", "No description")
|
|
|
|
# Get actual page content
|
|
try:
|
|
targetPageSoup = self._readUrl(url)
|
|
content = self._extractMainContent(targetPageSoup)
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting content from {url}: {str(e)}")
|
|
content = f"Error extracting content: {str(e)}"
|
|
|
|
results.append({
|
|
'title': title,
|
|
'url': url,
|
|
'snippet': snippet,
|
|
'content': content
|
|
})
|
|
|
|
# Limit number of results
|
|
if len(results) >= maxResults:
|
|
break
|
|
else:
|
|
logger.warning(f"No organic results found in SerpAPI response for: {query}")
|
|
|
|
search_result = {
|
|
"query": query,
|
|
"maxResults": maxResults,
|
|
"results": results,
|
|
"totalFound": len(results),
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching web: {str(e)}")
|
|
search_result = {
|
|
"error": str(e),
|
|
"query": query
|
|
}
|
|
|
|
# Create result data
|
|
result_data = {
|
|
"query": query,
|
|
"engine": engine,
|
|
"maxResults": maxResults,
|
|
"filter": filter,
|
|
"searchResults": search_result,
|
|
"timestamp": datetime.now(UTC).isoformat()
|
|
}
|
|
|
|
# Determine output format based on expected formats
|
|
output_extension = ".json" # Default
|
|
output_mime_type = "application/json" # Default
|
|
|
|
if expectedDocumentFormats and len(expectedDocumentFormats) > 0:
|
|
# Use the first expected format
|
|
expected_format = expectedDocumentFormats[0]
|
|
output_extension = expected_format.get("extension", ".json")
|
|
output_mime_type = expected_format.get("mimeType", "application/json")
|
|
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
|
|
else:
|
|
logger.info("No expected format specified, using default .json format")
|
|
|
|
return self._createResult(
|
|
success=True,
|
|
data={
|
|
"documents": [
|
|
{
|
|
"documentName": f"web_search_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}{output_extension}",
|
|
"documentData": result_data,
|
|
"mimeType": output_mime_type
|
|
}
|
|
]
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching web: {str(e)}")
|
|
return self._createResult(
|
|
success=False,
|
|
data={},
|
|
error=str(e)
|
|
)
|
|
|