307 lines
12 KiB
Python
307 lines
12 KiB
Python
"""
|
|
Web crawl service for handling web research operations.
|
|
Manages the two-step process: WEB_SEARCH then WEB_CRAWL.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class WebService:
|
|
"""Service for web search and crawling operations."""
|
|
|
|
def __init__(self, services):
|
|
"""Initialize webcrawl service with service center access."""
|
|
self.services = services
|
|
|
|
async def performWebResearch(
|
|
self,
|
|
prompt: str,
|
|
urls: List[str],
|
|
country: Optional[str],
|
|
language: Optional[str],
|
|
researchDepth: str = "general",
|
|
operationId: str = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Perform web research in two steps:
|
|
1. Use AI to analyze prompt and extract parameters + URLs
|
|
2. Call WEB_SEARCH to get URLs (if needed)
|
|
3. Combine URLs and filter to maxNumberPages
|
|
4. Call WEB_CRAWL for each URL
|
|
5. Return consolidated result
|
|
|
|
Args:
|
|
prompt: Natural language research prompt
|
|
urls: Optional list of URLs provided by user
|
|
country: Optional country code
|
|
language: Optional language code
|
|
operationId: Operation ID for progress tracking
|
|
|
|
Returns:
|
|
Consolidated research results as dictionary
|
|
"""
|
|
try:
|
|
# Step 1: AI intention analysis - extract URLs and parameters from prompt
|
|
self.services.workflow.progressLogUpdate(operationId, 0.1, "Analyzing research intent")
|
|
|
|
analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth)
|
|
|
|
# Extract parameters from AI analysis
|
|
instruction = analysisResult.get("instruction", prompt)
|
|
extractedUrls = analysisResult.get("urls", [])
|
|
needsSearch = analysisResult.get("needsSearch", True) # Default to True
|
|
maxNumberPages = analysisResult.get("maxNumberPages", 10)
|
|
countryCode = analysisResult.get("country", country)
|
|
languageCode = analysisResult.get("language", language)
|
|
finalResearchDepth = analysisResult.get("researchDepth", researchDepth)
|
|
|
|
logger.info(f"AI Analysis: instruction='{instruction[:100]}...', urls={len(extractedUrls)}, needsSearch={needsSearch}, maxNumberPages={maxNumberPages}, researchDepth={finalResearchDepth}")
|
|
|
|
# Combine URLs (from user + from prompt extraction)
|
|
allUrls = []
|
|
if urls:
|
|
allUrls.extend(urls)
|
|
if extractedUrls:
|
|
allUrls.extend(extractedUrls)
|
|
|
|
# Step 2: Search for URLs if needed (based on needsSearch flag)
|
|
if needsSearch and (not allUrls or len(allUrls) < maxNumberPages):
|
|
self.services.workflow.progressLogUpdate(operationId, 0.3, "Searching for URLs")
|
|
|
|
searchUrls = await self._performWebSearch(
|
|
instruction=instruction,
|
|
maxNumberPages=maxNumberPages - len(allUrls),
|
|
country=countryCode,
|
|
language=languageCode
|
|
)
|
|
|
|
# Add search URLs to the list
|
|
allUrls.extend(searchUrls)
|
|
|
|
self.services.workflow.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs")
|
|
|
|
# Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering)
|
|
if len(allUrls) > maxNumberPages:
|
|
allUrls = allUrls[:maxNumberPages]
|
|
logger.info(f"Limited URLs to {maxNumberPages}")
|
|
|
|
if not allUrls:
|
|
return {"error": "No URLs found to crawl"}
|
|
|
|
# Step 4: Translate researchDepth to maxDepth
|
|
depthMap = {"fast": 1, "general": 2, "deep": 3}
|
|
maxDepth = depthMap.get(finalResearchDepth.lower(), 2)
|
|
|
|
# Step 5: Crawl all URLs
|
|
self.services.workflow.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs")
|
|
|
|
crawlResult = await self._performWebCrawl(
|
|
instruction=instruction,
|
|
urls=allUrls,
|
|
maxDepth=maxDepth
|
|
)
|
|
|
|
self.services.workflow.progressLogUpdate(operationId, 0.9, "Consolidating results")
|
|
|
|
# Return consolidated result
|
|
return {
|
|
"instruction": instruction,
|
|
"urls_crawled": allUrls,
|
|
"total_urls": len(allUrls),
|
|
"results": crawlResult,
|
|
"total_results": len(crawlResult) if isinstance(crawlResult, list) else 1
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in web research: {str(e)}")
|
|
raise
|
|
|
|
async def _analyzeResearchIntent(
|
|
self,
|
|
prompt: str,
|
|
urls: List[str],
|
|
country: Optional[str],
|
|
language: Optional[str],
|
|
researchDepth: str = "general"
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Use AI to analyze prompt and extract:
|
|
- URLs from the prompt text
|
|
- Research instruction
|
|
- maxNumberPages, timeRange, country, language from context
|
|
"""
|
|
# Build analysis prompt for AI
|
|
analysisPrompt = f"""Analyze this web research request and extract structured information.
|
|
|
|
RESEARCH REQUEST:
|
|
{prompt}
|
|
|
|
USER PROVIDED:
|
|
- URLs: {json.dumps(urls) if urls else "None"}
|
|
- Country: {country or "Not specified"}
|
|
- Language: {language or "Not specified"}
|
|
|
|
Extract and provide a JSON response with:
|
|
1. instruction: The core research instruction (cleaned prompt without URLs)
|
|
2. urls: List of URLs found in the prompt text
|
|
3. needsSearch: true if web search is needed to identify url's to crawl, false if only crawling of provided URLs is wanted
|
|
4. maxNumberPages: Recommended number of URLs to crawl (based on research scope, typical: 2-20)
|
|
5. country: Country code if identified in the prompt (2-digit lowercase, e.g., ch, us, de)
|
|
6. language: Language identified from the prompt (lowercase, e.g., de, en, fr)
|
|
7. researchDepth: Research depth based on instruction complexity - "fast" (quick overview, maxDepth=1), "general" (standard research, maxDepth=2), or "deep" (comprehensive research, maxDepth=3)
|
|
|
|
Return ONLY valid JSON, no additional text:
|
|
{{
|
|
"instruction": "cleaned research instruction",
|
|
"urls": ["url1", "url2"],
|
|
"needsSearch": true,
|
|
"maxNumberPages": 10,
|
|
"country": "ch",
|
|
"language": "de",
|
|
"researchDepth": "general"
|
|
}}"""
|
|
|
|
try:
|
|
# Call AI planning to analyze intent
|
|
analysisJson = await self.services.ai.callAiPlanning(analysisPrompt)
|
|
|
|
# Parse JSON response
|
|
result = json.loads(analysisJson)
|
|
|
|
logger.info(f"Intent analysis result: {result}")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error in AI intent analysis: {str(e)}")
|
|
# Fallback to basic extraction
|
|
return {
|
|
"instruction": prompt,
|
|
"urls": [],
|
|
"needsSearch": True,
|
|
"maxNumberPages": 10,
|
|
"country": country,
|
|
"language": language,
|
|
"researchDepth": researchDepth
|
|
}
|
|
|
|
async def _performWebSearch(
|
|
self,
|
|
instruction: str,
|
|
maxNumberPages: int,
|
|
country: Optional[str],
|
|
language: Optional[str]
|
|
) -> List[str]:
|
|
"""Perform web search to find URLs."""
|
|
try:
|
|
# Build search prompt model
|
|
searchPromptModel = AiCallPromptWebSearch(
|
|
instruction=instruction,
|
|
country=country,
|
|
maxNumberPages=maxNumberPages,
|
|
language=language
|
|
)
|
|
searchPrompt = searchPromptModel.model_dump_json(exclude_none=True, indent=2)
|
|
|
|
# Call AI with WEB_SEARCH operation
|
|
searchOptions = AiCallOptions(
|
|
operationType=OperationTypeEnum.WEB_SEARCH,
|
|
resultFormat="json"
|
|
)
|
|
|
|
searchResult = await self.services.ai.callAiDocuments(
|
|
prompt=searchPrompt,
|
|
documents=None,
|
|
options=searchOptions,
|
|
outputFormat="json"
|
|
)
|
|
|
|
# Parse and extract URLs
|
|
if isinstance(searchResult, str):
|
|
searchData = json.loads(searchResult)
|
|
else:
|
|
searchData = searchResult
|
|
|
|
# Extract URLs from response
|
|
urls = []
|
|
if isinstance(searchData, dict):
|
|
if "urls" in searchData:
|
|
urls = searchData["urls"]
|
|
elif "results" in searchData:
|
|
urls = [r.get("url") for r in searchData["results"] if r.get("url")]
|
|
elif isinstance(searchData, list):
|
|
urls = [item.get("url") for item in searchData if item.get("url")]
|
|
|
|
logger.info(f"Web search returned {len(urls)} URLs")
|
|
return urls
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in web search: {str(e)}")
|
|
return []
|
|
|
|
async def _performWebCrawl(
|
|
self,
|
|
instruction: str,
|
|
urls: List[str],
|
|
maxDepth: int = 2
|
|
) -> List[Dict[str, Any]]:
|
|
"""Perform web crawl on list of URLs - calls plugin for each URL individually."""
|
|
crawlResults = []
|
|
|
|
# Loop over each URL and crawl one at a time
|
|
for url in urls:
|
|
try:
|
|
logger.info(f"Crawling URL: {url}")
|
|
|
|
# Build crawl prompt model for single URL
|
|
crawlPromptModel = AiCallPromptWebCrawl(
|
|
instruction=instruction,
|
|
url=url, # Single URL
|
|
maxDepth=maxDepth,
|
|
maxWidth=50
|
|
)
|
|
crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
|
|
|
|
# Call AI with WEB_CRAWL operation
|
|
crawlOptions = AiCallOptions(
|
|
operationType=OperationTypeEnum.WEB_CRAWL,
|
|
resultFormat="json"
|
|
)
|
|
|
|
crawlResult = await self.services.ai.callAiDocuments(
|
|
prompt=crawlPrompt,
|
|
documents=None,
|
|
options=crawlOptions,
|
|
outputFormat="json"
|
|
)
|
|
|
|
# Parse crawl result
|
|
if isinstance(crawlResult, str):
|
|
try:
|
|
crawlData = json.loads(crawlResult)
|
|
except:
|
|
crawlData = {"url": url, "content": crawlResult}
|
|
else:
|
|
crawlData = crawlResult
|
|
|
|
# Ensure it's a list of results
|
|
if isinstance(crawlData, list):
|
|
crawlResults.extend(crawlData)
|
|
elif isinstance(crawlData, dict):
|
|
if "results" in crawlData:
|
|
crawlResults.extend(crawlData["results"])
|
|
else:
|
|
crawlResults.append(crawlData)
|
|
else:
|
|
crawlResults.append({"url": url, "content": str(crawlData)})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error crawling URL {url}: {str(e)}")
|
|
crawlResults.append({"url": url, "error": str(e)})
|
|
|
|
return crawlResults
|
|
|