805 lines
No EOL
31 KiB
Python
805 lines
No EOL
31 KiB
Python
"""
|
|
Webcrawler agent for research and retrieval of information from the web.
|
|
Reimagined with an output-first, AI-driven approach.
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import re
|
|
import time
|
|
from typing import Dict, Any, List
|
|
from urllib.parse import quote_plus, unquote
|
|
|
|
from bs4 import BeautifulSoup
|
|
import requests
|
|
import markdown
|
|
|
|
from modules.workflowAgentsRegistry import AgentBase
|
|
from modules.configuration import APP_CONFIG
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AgentWebcrawler(AgentBase):
|
|
"""AI-driven agent for web research and information retrieval"""
|
|
|
|
def __init__(self):
|
|
"""Initialize the webcrawler agent"""
|
|
super().__init__()
|
|
self.name = "webcrawler"
|
|
self.label = "Web-Research"
|
|
self.description = "Conducts web research and collects information from online sources"
|
|
self.capabilities = [
|
|
"webSearch",
|
|
"informationRetrieval",
|
|
"dataCollection",
|
|
"searchResultsAnalysis",
|
|
"webpageContentExtraction"
|
|
]
|
|
|
|
# Web crawling configuration
|
|
self.srcApikey = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_APIKEY","")
|
|
self.srcEngine = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_ENGINE","google")
|
|
self.srcCountry = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_COUNTRY","auto")
|
|
self.maxUrl = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_URLS", "5"))
|
|
self.maxSearchTerms = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_KEYWORDS", "3"))
|
|
self.maxResults = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_MAX_SEARCH_RESULTS", "5"))
|
|
self.timeout = int(APP_CONFIG.get("Agent_Webcrawler_SERPAPI_TIMEOUT", "30"))
|
|
self.userAgent = APP_CONFIG.get("Agent_Webcrawler_SERPAPI_USER_AGENT", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
|
|
|
|
if not self.srcApikey:
|
|
logger.error("SerpAPI key not configured")
|
|
|
|
|
|
def setDependencies(self, mydom=None):
|
|
"""Set external dependencies for the agent."""
|
|
self.mydom = mydom
|
|
|
|
async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Process a task by focusing on required outputs and using AI to guide the research process.
|
|
|
|
Args:
|
|
task: Task dictionary with prompt, inputDocuments, outputSpecifications
|
|
|
|
Returns:
|
|
Dictionary with feedback and documents
|
|
"""
|
|
try:
|
|
# Extract task information
|
|
prompt = task.get("prompt", "")
|
|
outputSpecs = task.get("outputSpecifications", [])
|
|
|
|
# Check AI service
|
|
if not self.mydom:
|
|
return {
|
|
"feedback": "The Webcrawler agent requires an AI service to function effectively.",
|
|
"documents": []
|
|
}
|
|
|
|
# Create research plan
|
|
researchPlan = await self._createResearchPlan(prompt)
|
|
|
|
# Check if this is truly a web research task
|
|
if not researchPlan.get("requiresWebResearch", True):
|
|
return {
|
|
"feedback": "This task doesn't appear to require web research. Please try a different agent.",
|
|
"documents": []
|
|
}
|
|
|
|
# Gather raw material through web research
|
|
rawResults = await self._gatherResearchMaterial(researchPlan)
|
|
|
|
# Format results into requested output documents
|
|
documents = await self._createOutputDocuments(
|
|
prompt,
|
|
rawResults,
|
|
outputSpecs,
|
|
researchPlan
|
|
)
|
|
|
|
# Generate feedback
|
|
feedback = researchPlan.get("feedback", f"I conducted web research on '{prompt[:50]}...' and gathered information from {len(rawResults)} relevant sources.")
|
|
|
|
return {
|
|
"feedback": feedback,
|
|
"documents": documents
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during web research: {str(e)}", exc_info=True)
|
|
return {
|
|
"feedback": f"Error during web research: {str(e)}",
|
|
"documents": []
|
|
}
|
|
|
|
async def _createResearchPlan(self, prompt: str) -> Dict[str, Any]:
|
|
"""
|
|
Use AI to create a detailed research plan.
|
|
|
|
Args:
|
|
prompt: The research query
|
|
|
|
Returns:
|
|
Research plan dictionary
|
|
"""
|
|
researchPrompt = f"""
|
|
Create a detailed web research plan for this task: "{prompt}"
|
|
|
|
Analyze the request carefully and create a structured plan in JSON format with the following elements:
|
|
{{
|
|
"requiresWebResearch": true/false, # Whether this genuinely requires web research
|
|
"researchQuestions": ["question1", "question2", ...], # 2-4 specific questions to answer
|
|
"searchTerms": ["term1", "term2", ...], # Up to {self.maxSearchTerms} effective search terms
|
|
"directUrls": ["url1", "url2", ...], # Any URLs directly mentioned in the request (up to {self.maxUrl})
|
|
"expectedSources": ["type1", "type2", ...], # Types of sources that would be most valuable
|
|
"contentFocus": "what specific content to extract or focus on",
|
|
"feedback": "explanation of how the research will be conducted"
|
|
}}
|
|
|
|
Respond with ONLY the JSON object, no additional text or explanations.
|
|
"""
|
|
|
|
try:
|
|
# Get research plan from AI
|
|
response = await self.mydom.callAi([
|
|
{"role": "system", "content": "You are a web research planning expert. Create precise research plans in JSON format only."},
|
|
{"role": "user", "content": researchPrompt}
|
|
])
|
|
|
|
# Extract JSON
|
|
jsonStart = response.find('{')
|
|
jsonEnd = response.rfind('}') + 1
|
|
|
|
if jsonStart >= 0 and jsonEnd > jsonStart:
|
|
plan = json.loads(response[jsonStart:jsonEnd])
|
|
|
|
# Ensure we have the expected fields with defaults if missing
|
|
if "searchTerms" not in plan:
|
|
plan["searchTerms"] = [prompt]
|
|
if "directUrls" not in plan:
|
|
plan["directUrls"] = []
|
|
if "researchQuestions" not in plan:
|
|
plan["researchQuestions"] = ["What information can be found about this topic?"]
|
|
|
|
return plan
|
|
else:
|
|
# Fallback plan
|
|
logger.warning(f"Not able creating research plan, generating fallback plan")
|
|
return {
|
|
"requiresWebResearch": True,
|
|
"researchQuestions": ["What information can be found about this topic?"],
|
|
"searchTerms": [prompt],
|
|
"directUrls": [],
|
|
"expectedSources": ["Web pages", "Articles"],
|
|
"contentFocus": "Relevant information about the topic",
|
|
"feedback": f"I'll conduct web research on '{prompt}' and gather relevant information."
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error creating research plan: {str(e)}")
|
|
# Simple fallback plan
|
|
return {
|
|
"requiresWebResearch": True,
|
|
"researchQuestions": ["What information can be found about this topic?"],
|
|
"searchTerms": [prompt],
|
|
"directUrls": [],
|
|
"expectedSources": ["Web pages", "Articles"],
|
|
"contentFocus": "Relevant information about the topic",
|
|
"feedback": f"I'll conduct web research on '{prompt}' and gather relevant information."
|
|
}
|
|
|
|
async def _gatherResearchMaterial(self, researchPlan: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Gather research material based on the research plan.
|
|
|
|
Args:
|
|
researchPlan: Research plan dictionary
|
|
|
|
Returns:
|
|
List of research results
|
|
"""
|
|
allResults = []
|
|
|
|
# Process direct URLs
|
|
directUrls = researchPlan.get("directUrls", [])[:self.maxUrl]
|
|
for url in directUrls:
|
|
logger.info(f"Processing direct URL: {url}")
|
|
try:
|
|
# Fetch and extract content
|
|
soup = self._readUrl(url)
|
|
|
|
if soup:
|
|
# Extract title and content
|
|
title = self._extractTitle(soup, url)
|
|
content = self._extractMainContent(soup)
|
|
|
|
# Add to results
|
|
allResults.append({
|
|
"title": title,
|
|
"url": url,
|
|
"sourceType": "directUrl",
|
|
"content": content,
|
|
"summary": "" # Will be filled later
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Error processing URL {url}: {str(e)}")
|
|
|
|
# Process search terms
|
|
searchTerms = researchPlan.get("searchTerms", [])[:self.maxSearchTerms]
|
|
for term in searchTerms:
|
|
logger.info(f"Searching for: {term}")
|
|
try:
|
|
# Perform search
|
|
searchResults = self._searchWeb(term)
|
|
|
|
# Process each search result
|
|
for result in searchResults:
|
|
# Check if URL is already in results
|
|
if not any(r["url"] == result["url"] for r in allResults):
|
|
allResults.append({
|
|
"title": result["title"],
|
|
"url": result["url"],
|
|
"sourceType": "searchResult",
|
|
"content": result["data"],
|
|
"snippet": result["snippet"],
|
|
"summary": "" # Will be filled later
|
|
})
|
|
|
|
# Stop if we've reached the maximum results
|
|
if len(allResults) >= self.maxResults:
|
|
break
|
|
except Exception as e:
|
|
logger.warning(f"Error searching for {term}: {str(e)}")
|
|
|
|
# Stop if we've reached the maximum results
|
|
if len(allResults) >= self.maxResults:
|
|
break
|
|
|
|
# Create summaries in parallel for all results
|
|
allResults = await self._summarizeAllResults(allResults, researchPlan)
|
|
|
|
return allResults
|
|
|
|
async def _summarizeAllResults(self, results: List[Dict[str, Any]], researchPlan: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Create summaries for all research results.
|
|
|
|
Args:
|
|
results: List of research results
|
|
researchPlan: Research plan with questions and focus
|
|
|
|
Returns:
|
|
Results with added summaries
|
|
"""
|
|
for i, result in enumerate(results):
|
|
logger.info(f"Summarizing result {i+1}/{len(results)}: {result['title'][:30]}...")
|
|
|
|
try:
|
|
# Limit content length to avoid token issues
|
|
content = self._limitText(result.get("content", ""), maxChars=8000)
|
|
researchQuestions = researchPlan.get("researchQuestions", ["What relevant information does this page contain?"])
|
|
contentFocus = researchPlan.get("contentFocus", "Relevant information")
|
|
|
|
# Create summary using AI
|
|
summaryPrompt = f"""
|
|
Summarize this web page content based on these research questions:
|
|
{', '.join(researchQuestions)}
|
|
|
|
Focus on: {contentFocus}
|
|
|
|
Web page: {result['url']}
|
|
Title: {result['title']}
|
|
|
|
Content:
|
|
{content}
|
|
|
|
Create a concise summary that:
|
|
1. Directly answers the research questions if possible
|
|
2. Extracts the most relevant information from the page
|
|
3. Includes specific facts, figures, or quotes if available
|
|
4. Is around 2000 characters long
|
|
|
|
Only include information actually found in the content. No fabrications or assumptions.
|
|
"""
|
|
|
|
if self.mydom:
|
|
summary = await self.mydom.callAi([
|
|
{"role": "system", "content": "You summarize web content accurately and concisely, focusing only on what is actually in the content."},
|
|
{"role": "user", "content": summaryPrompt}
|
|
])
|
|
|
|
# Store the summary
|
|
result["summary"] = summary
|
|
else:
|
|
# Fallback if no AI service
|
|
logger.warning(f"Not able to summarize result, using fallback plan.")
|
|
result["summary"] = f"Content from {result['url']} ({len(content)} characters)"
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error summarizing result {i+1}: {str(e)}")
|
|
result["summary"] = f"Error creating summary: {str(e)}"
|
|
|
|
return results
|
|
|
|
async def _createOutputDocuments(self, prompt: str, results: List[Dict[str, Any]],
|
|
outputSpecs: List[Dict[str, Any]], researchPlan: Dict[str, Any]) -> List[Dict[str, Any]]:
|
|
"""
|
|
Create output documents based on research results and specifications.
|
|
|
|
Args:
|
|
prompt: Original research prompt
|
|
results: List of research results
|
|
outputSpecs: Output specifications
|
|
researchPlan: Research plan
|
|
|
|
Returns:
|
|
List of output documents
|
|
"""
|
|
# If no output specs provided, create default output
|
|
if not outputSpecs:
|
|
outputSpecs = [{
|
|
"label": "webResearchResults.md",
|
|
"description": "Comprehensive web research results"
|
|
}]
|
|
|
|
# Generate documents
|
|
documents = []
|
|
|
|
# Process each output specification
|
|
for spec in outputSpecs:
|
|
outputLabel = spec.get("label", "")
|
|
outputDescription = spec.get("description", "")
|
|
|
|
# Determine format based on file extension
|
|
formatType = self._determineFormatType(outputLabel)
|
|
|
|
# Create appropriate document based on format
|
|
if formatType == "json":
|
|
# JSON output - structured data
|
|
document = await self._createJsonDocument(prompt, results, researchPlan, outputLabel)
|
|
elif formatType == "csv":
|
|
# CSV output - tabular data
|
|
document = await self._createCsvDocument(results, outputLabel)
|
|
else:
|
|
# Text-based output (markdown, html, text) - narrative report
|
|
document = await self._createNarrativeDocument(
|
|
prompt, results, researchPlan, formatType, outputLabel, outputDescription
|
|
)
|
|
|
|
documents.append(document)
|
|
|
|
return documents
|
|
|
|
async def _createNarrativeDocument(self, prompt: str, results: List[Dict[str, Any]],
|
|
researchPlan: Dict[str, Any], formatType: str,
|
|
outputLabel: str, outputDescription: str) -> Dict[str, Any]:
|
|
"""
|
|
Create a narrative document (markdown, html, text) from research results.
|
|
|
|
Args:
|
|
prompt: Original research prompt
|
|
results: Research results
|
|
researchPlan: Research plan
|
|
formatType: Output format (markdown, html, text)
|
|
outputLabel: Output filename
|
|
outputDescription: Output description
|
|
|
|
Returns:
|
|
Document object
|
|
"""
|
|
# Create content based on format
|
|
if formatType == "markdown":
|
|
contentType = "text/markdown"
|
|
templateFormat = "markdown"
|
|
elif formatType == "html":
|
|
contentType = "text/html"
|
|
templateFormat = "html"
|
|
else:
|
|
contentType = "text/plain"
|
|
templateFormat = "text"
|
|
|
|
# Prepare research context
|
|
researchQuestions = researchPlan.get("researchQuestions", [])
|
|
searchTerms = researchPlan.get("searchTerms", [])
|
|
|
|
# Create document structure based on results
|
|
sourcesSummary = []
|
|
for result in results:
|
|
sourcesSummary.append({
|
|
"title": result.get("title", "Untitled"),
|
|
"url": result.get("url", ""),
|
|
"summary": result.get("summary", ""),
|
|
"snippet": result.get("snippet", "")
|
|
})
|
|
|
|
# Truncate content for prompt
|
|
sourcesJson = json.dumps(sourcesSummary, indent=2)
|
|
if len(sourcesJson) > 10000:
|
|
# Logic to truncate each summary while preserving structure
|
|
for i in range(len(sourcesSummary)):
|
|
if len(sourcesJson) <= 10000:
|
|
break
|
|
# Gradually truncate summaries
|
|
sourcesSummary[i]["summary"] = sourcesSummary[i]["summary"][:500] + "..."
|
|
sourcesJson = json.dumps(sourcesSummary, indent=2)
|
|
|
|
# Create report prompt
|
|
reportPrompt = f"""
|
|
Create a comprehensive {formatType} research report based on the following web research:
|
|
|
|
TASK: {prompt}
|
|
|
|
RESEARCH QUESTIONS:
|
|
{', '.join(researchQuestions)}
|
|
|
|
SEARCH TERMS USED:
|
|
{', '.join(searchTerms)}
|
|
|
|
SOURCES AND FINDINGS:
|
|
{sourcesJson}
|
|
|
|
REPORT DETAILS:
|
|
- Format: {templateFormat}
|
|
- Filename: {outputLabel}
|
|
- Description: {outputDescription}
|
|
|
|
Create a well-structured report that:
|
|
1. Includes an executive summary of key findings
|
|
2. Addresses each research question directly
|
|
3. Integrates information from all relevant sources
|
|
4. Cites sources appropriately for each piece of information
|
|
5. Provides a comprehensive synthesis of the research
|
|
6. Is formatted professionally and appropriately for {templateFormat}
|
|
|
|
The report should be scholarly, accurate, and focused on the original research task.
|
|
"""
|
|
|
|
try:
|
|
# Generate report with AI
|
|
reportContent = await self.mydom.callAi([
|
|
{"role": "system", "content": f"You create professional research reports in {templateFormat} format."},
|
|
{"role": "user", "content": reportPrompt}
|
|
])
|
|
|
|
# Convert to HTML if needed
|
|
if formatType == "html" and not reportContent.lower().startswith("<html"):
|
|
# Check if it's markdown that needs conversion
|
|
if reportContent.startswith("#"):
|
|
reportContent = markdown.markdown(reportContent)
|
|
# Wrap in basic HTML structure if needed
|
|
if not reportContent.lower().startswith("<html"):
|
|
reportContent = f"<html><head><title>Web Research Results</title></head><body>{reportContent}</body></html>"
|
|
|
|
return self.formatAgentDocumentOutput(outputLabel, reportContent, contentType)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating narrative document: {str(e)}")
|
|
# Create error document
|
|
if formatType == "markdown":
|
|
content = f"# Web Research Error\n\nAn error occurred: {str(e)}"
|
|
elif formatType == "html":
|
|
content = f"<html><body><h1>Web Research Error</h1><p>An error occurred: {str(e)}</p></body></html>"
|
|
else:
|
|
content = f"WEB RESEARCH ERROR\n\nAn error occurred: {str(e)}"
|
|
|
|
return self.formatAgentDocumentOutput(outputLabel, content, contentType)
|
|
|
|
async def _createJsonDocument(self, prompt: str, results: List[Dict[str, Any]],
|
|
researchPlan: Dict[str, Any], outputLabel: str) -> Dict[str, Any]:
|
|
"""
|
|
Create a JSON document from research results.
|
|
|
|
Args:
|
|
prompt: Original research prompt
|
|
results: Research results
|
|
researchPlan: Research plan
|
|
outputLabel: Output filename
|
|
|
|
Returns:
|
|
Document object
|
|
"""
|
|
try:
|
|
# Create structured data
|
|
sourcesData = []
|
|
for result in results:
|
|
sourcesData.append({
|
|
"title": result.get("title", "Untitled"),
|
|
"url": result.get("url", ""),
|
|
"summary": result.get("summary", ""),
|
|
"snippet": result.get("snippet", ""),
|
|
"sourceType": result.get("sourceType", "")
|
|
})
|
|
|
|
# Create metadata
|
|
metadata = {
|
|
"query": prompt,
|
|
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"researchQuestions": researchPlan.get("researchQuestions", []),
|
|
"searchTerms": researchPlan.get("searchTerms", [])
|
|
}
|
|
|
|
# Compile complete report object
|
|
jsonContent = {
|
|
"metadata": metadata,
|
|
"summary": researchPlan.get("feedback", "Web research results"),
|
|
"sources": sourcesData
|
|
}
|
|
|
|
# Convert to JSON string
|
|
content = json.dumps(jsonContent, indent=2)
|
|
|
|
return self.formatAgentDocumentOutput(outputLabel, content, "application/json")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating JSON document: {str(e)}")
|
|
return self.formatAgentDocumentOutput(outputLabel, json.dumps({"error": str(e)}), "application/json")
|
|
|
|
async def _createCsvDocument(self, results: List[Dict[str, Any]], outputLabel: str) -> Dict[str, Any]:
|
|
"""
|
|
Create a CSV document from research results.
|
|
|
|
Args:
|
|
results: Research results
|
|
outputLabel: Output filename
|
|
|
|
Returns:
|
|
Document object
|
|
"""
|
|
try:
|
|
# Create CSV header
|
|
csvLines = ["Title,URL,Source Type,Snippet"]
|
|
|
|
# Add results
|
|
for result in results:
|
|
# Escape CSV fields
|
|
title = result.get("title", "").replace('"', '""')
|
|
url = result.get("url", "").replace('"', '""')
|
|
sourceType = result.get("sourceType", "").replace('"', '""')
|
|
snippet = result.get("snippet", "").replace('"', '""')
|
|
|
|
csvLines.append(f'"{title}","{url}","{sourceType}","{snippet}"')
|
|
|
|
# Combine into CSV content
|
|
content = "\n".join(csvLines)
|
|
|
|
return self.formatAgentDocumentOutput(outputLabel, content, "text/csv")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error creating CSV document: {str(e)}")
|
|
return self.formatAgentDocumentOutput(outputLabel, "Error,Error\nFailed to create CSV,{0}".format(str(e)), "text/csv")
|
|
|
|
def _determineFormatType(self, outputLabel: str) -> str:
|
|
"""
|
|
Determine the format type based on the filename.
|
|
|
|
Args:
|
|
outputLabel: Output filename
|
|
|
|
Returns:
|
|
Format type (markdown, html, text, json, csv)
|
|
"""
|
|
outputLabelLower = outputLabel.lower()
|
|
|
|
if outputLabelLower.endswith(".md"):
|
|
return "markdown"
|
|
elif outputLabelLower.endswith(".html"):
|
|
return "html"
|
|
elif outputLabelLower.endswith(".txt"):
|
|
return "text"
|
|
elif outputLabelLower.endswith(".json"):
|
|
return "json"
|
|
elif outputLabelLower.endswith(".csv"):
|
|
return "csv"
|
|
else:
|
|
# Default to markdown
|
|
return "markdown"
|
|
|
|
def _searchWeb(self, query: str) -> List[Dict[str, str]]:
|
|
"""
|
|
Conduct a web search using SerpAPI and return the results.
|
|
|
|
Args:
|
|
query: The search query
|
|
|
|
Returns:
|
|
List of search results
|
|
"""
|
|
if not self.srcApikey:
|
|
return []
|
|
|
|
# Get user language from mydom if available
|
|
userLanguage = "en" # Default language
|
|
if self.mydom.userLanguage:
|
|
userLanguage = self.mydom.userLanguage
|
|
|
|
try:
|
|
# Format the search request for SerpAPI
|
|
params = {
|
|
"engine": self.srcEngine,
|
|
"q": query,
|
|
"api_key": self.srcApikey,
|
|
"num": self.maxResults, # Number of results to return
|
|
"hl": userLanguage # Identified user language
|
|
}
|
|
|
|
# Make the API request
|
|
response = requests.get("https://serpapi.com/search", params=params, timeout=self.timeout)
|
|
response.raise_for_status()
|
|
|
|
# Parse JSON response
|
|
search_results = response.json()
|
|
|
|
# Extract organic results
|
|
results = []
|
|
|
|
if "organic_results" in search_results:
|
|
for result in search_results["organic_results"][:self.maxResults]:
|
|
# Extract title
|
|
title = result.get("title", "No title")
|
|
|
|
# Extract URL
|
|
url = result.get("link", "No URL")
|
|
|
|
# Extract snippet
|
|
snippet = result.get("snippet", "No description")
|
|
|
|
# Get actual page content
|
|
try:
|
|
targetPageSoup = self._readUrl(url)
|
|
content = self._extractMainContent(targetPageSoup)
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting content from {url}: {str(e)}")
|
|
content = f"Error extracting content: {str(e)}"
|
|
|
|
results.append({
|
|
'title': title,
|
|
'url': url,
|
|
'snippet': snippet,
|
|
'data': content
|
|
})
|
|
|
|
# Limit number of results
|
|
if len(results) >= self.maxResults:
|
|
break
|
|
else:
|
|
logger.warning(f"No organic results found in SerpAPI response for: {query}")
|
|
|
|
return results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error searching with SerpAPI for {query}: {str(e)}")
|
|
return []
|
|
|
|
def _readUrl(self, url: str) -> BeautifulSoup:
|
|
"""
|
|
Read a URL and return a BeautifulSoup parser for the content.
|
|
|
|
Args:
|
|
url: The URL to read
|
|
|
|
Returns:
|
|
BeautifulSoup object with the content or None on errors
|
|
"""
|
|
if not url or not url.startswith(('http://', 'https://')):
|
|
return None
|
|
|
|
headers = {
|
|
'User-Agent': self.userAgent,
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml',
|
|
'Accept-Language': 'en-US,en;q=0.9',
|
|
}
|
|
|
|
try:
|
|
# Initial request
|
|
response = requests.get(url, headers=headers, timeout=self.timeout)
|
|
|
|
# Handling for status 202
|
|
if response.status_code == 202:
|
|
# Retry with backoff
|
|
backoffTimes = [0.5, 1.0, 2.0, 5.0]
|
|
|
|
for waitTime in backoffTimes:
|
|
time.sleep(waitTime)
|
|
response = requests.get(url, headers=headers, timeout=self.timeout)
|
|
|
|
if response.status_code != 202:
|
|
break
|
|
|
|
# Raise for error status codes
|
|
response.raise_for_status()
|
|
|
|
# Parse HTML
|
|
return BeautifulSoup(response.text, 'html.parser')
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error reading URL {url}: {str(e)}")
|
|
return None
|
|
|
|
def _extractTitle(self, soup: BeautifulSoup, url: str) -> str:
|
|
"""
|
|
Extract the title from a webpage.
|
|
|
|
Args:
|
|
soup: BeautifulSoup object of the webpage
|
|
url: URL of the webpage
|
|
|
|
Returns:
|
|
Extracted title
|
|
"""
|
|
if not soup:
|
|
return f"Error with {url}"
|
|
|
|
# Extract title from title tag
|
|
titleTag = soup.find('title')
|
|
title = titleTag.text.strip() if titleTag else "No title"
|
|
|
|
# Alternative: Also look for h1 tags if title tag is missing
|
|
if title == "No title":
|
|
h1Tag = soup.find('h1')
|
|
if h1Tag:
|
|
title = h1Tag.text.strip()
|
|
|
|
return title
|
|
|
|
def _extractMainContent(self, soup: BeautifulSoup, maxChars: int = 10000) -> str:
|
|
"""
|
|
Extract the main content from an HTML page.
|
|
|
|
Args:
|
|
soup: BeautifulSoup object of the webpage
|
|
maxChars: Maximum number of characters
|
|
|
|
Returns:
|
|
Extracted main content as a string
|
|
"""
|
|
if not soup:
|
|
return ""
|
|
|
|
# Try to find main content elements in priority order
|
|
mainContent = None
|
|
for selector in ['main', 'article', '#content', '.content', '#main', '.main']:
|
|
content = soup.select_one(selector)
|
|
if content:
|
|
mainContent = content
|
|
break
|
|
|
|
# If no main content found, use the body
|
|
if not mainContent:
|
|
mainContent = soup.find('body') or soup
|
|
|
|
# Remove script, style, nav, footer elements that don't contribute to main content
|
|
for element in mainContent.select('script, style, nav, footer, header, aside, .sidebar, #sidebar, .comments, #comments, .advertisement, .ads, iframe'):
|
|
element.extract()
|
|
|
|
# Extract text content
|
|
textContent = mainContent.get_text(separator=' ', strip=True)
|
|
|
|
# Limit to maxChars
|
|
return textContent[:maxChars]
|
|
|
|
def _limitText(self, text: str, maxChars: int = 10000) -> str:
|
|
"""
|
|
Limit text to a maximum number of characters.
|
|
|
|
Args:
|
|
text: Input text
|
|
maxChars: Maximum number of characters
|
|
|
|
Returns:
|
|
Limited text
|
|
"""
|
|
if not text:
|
|
return ""
|
|
|
|
# If text is already under the limit, return unchanged
|
|
if len(text) <= maxChars:
|
|
return text
|
|
|
|
# Otherwise limit text to maxChars
|
|
return text[:maxChars] + "... [Content truncated due to length]"
|
|
|
|
|
|
# Factory function for the Webcrawler agent
|
|
def getAgentWebcrawler():
|
|
"""Returns an instance of the Webcrawler agent."""
|
|
return AgentWebcrawler() |