gateway/modules/services/serviceWeb/mainServiceWeb.py

355 lines
15 KiB
Python

"""
Web crawl service for handling web research operations.
Manages the two-step process: WEB_SEARCH then WEB_CRAWL.
"""
import json
import logging
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl
logger = logging.getLogger(__name__)
class WebService:
"""Service for web search and crawling operations."""
def __init__(self, services):
"""Initialize webcrawl service with service center access."""
self.services = services
async def performWebResearch(
self,
prompt: str,
urls: List[str],
country: Optional[str],
language: Optional[str],
researchDepth: str = "general",
operationId: str = None
) -> Dict[str, Any]:
"""
Perform web research in two steps:
1. Use AI to analyze prompt and extract parameters + URLs
2. Call WEB_SEARCH to get URLs (if needed)
3. Combine URLs and filter to maxNumberPages
4. Call WEB_CRAWL for each URL
5. Return consolidated result
Args:
prompt: Natural language research prompt
urls: Optional list of URLs provided by user
country: Optional country code
language: Optional language code
operationId: Operation ID for progress tracking
Returns:
Consolidated research results as dictionary
"""
try:
# Step 1: AI intention analysis - extract URLs and parameters from prompt
self.services.chat.progressLogUpdate(operationId, 0.1, "Analyzing research intent")
analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth)
# Extract parameters from AI analysis
instruction = analysisResult.get("instruction", prompt)
extractedUrls = analysisResult.get("urls", [])
needsSearch = analysisResult.get("needsSearch", True) # Default to True
maxNumberPages = analysisResult.get("maxNumberPages", 10)
countryCode = analysisResult.get("country", country)
languageCode = analysisResult.get("language", language)
finalResearchDepth = analysisResult.get("researchDepth", researchDepth)
suggestedFilename = analysisResult.get("filename", None)
logger.info(f"AI Analysis: instruction='{instruction[:100]}...', urls={len(extractedUrls)}, needsSearch={needsSearch}, maxNumberPages={maxNumberPages}, researchDepth={finalResearchDepth}, filename={suggestedFilename}")
# Combine URLs (from user + from prompt extraction)
allUrls = []
if urls:
allUrls.extend(urls)
if extractedUrls:
allUrls.extend(extractedUrls)
# Step 2: Search for URLs if needed (based on needsSearch flag)
if needsSearch and (not allUrls or len(allUrls) < maxNumberPages):
self.services.chat.progressLogUpdate(operationId, 0.3, "Searching for URLs")
searchUrls = await self._performWebSearch(
instruction=instruction,
maxNumberPages=maxNumberPages - len(allUrls),
country=countryCode,
language=languageCode
)
# Add search URLs to the list
allUrls.extend(searchUrls)
self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs")
# Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering)
if len(allUrls) > maxNumberPages:
allUrls = allUrls[:maxNumberPages]
logger.info(f"Limited URLs to {maxNumberPages}")
if not allUrls:
return {"error": "No URLs found to crawl"}
# Step 4: Translate researchDepth to maxDepth
depthMap = {"fast": 1, "general": 2, "deep": 3}
maxDepth = depthMap.get(finalResearchDepth.lower(), 2)
# Step 5: Crawl all URLs
self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs")
crawlResult = await self._performWebCrawl(
instruction=instruction,
urls=allUrls,
maxDepth=maxDepth
)
self.services.chat.progressLogUpdate(operationId, 0.9, "Consolidating results")
# Return consolidated result
result = {
"instruction": instruction,
"urls_crawled": allUrls,
"total_urls": len(allUrls),
"results": crawlResult,
"total_results": len(crawlResult) if isinstance(crawlResult, list) else 1
}
# Add suggested filename if available
if suggestedFilename:
result["suggested_filename"] = suggestedFilename
return result
except Exception as e:
logger.error(f"Error in web research: {str(e)}")
raise
async def _analyzeResearchIntent(
self,
prompt: str,
urls: List[str],
country: Optional[str],
language: Optional[str],
researchDepth: str = "general"
) -> Dict[str, Any]:
"""
Use AI to analyze prompt and extract:
- URLs from the prompt text
- Research instruction
- maxNumberPages, timeRange, country, language from context
"""
# Build analysis prompt for AI
analysisPrompt = f"""Analyze this web research request and extract structured information.
RESEARCH REQUEST:
{prompt}
USER PROVIDED:
- URLs: {json.dumps(urls) if urls else "None"}
- Country: {country or "Not specified"}
- Language: {language or "Not specified"}
Extract and provide a JSON response with:
1. instruction: Formulate directly, WHAT you want to find on the web. Do not include URLs in the instruction. Good example: "What is the company Xyz doing?". Bad example: "Conduct web research on the company Xyz"
2. urls: Put list of URLs found in the prompt text, and URL's you know, that are relevant to the research
3. needsSearch: true if web search is needed to identify url's to crawl, false if only crawling of provided URLs is wanted
4. maxNumberPages: Recommended number of URLs to crawl (based on research scope, typical: 2-20)
5. country: Country code if identified in the prompt (2-digit lowercase, e.g., ch, us, de)
6. language: Language identified from the prompt (lowercase, e.g., de, en, fr)
7. researchDepth: Research depth based on instruction complexity - "fast" (quick overview, maxDepth=1), "general" (standard research, maxDepth=2), or "deep" (comprehensive research, maxDepth=3)
8. filename: Generate a concise, descriptive filename (without extension) for the research results. Should be short (max 50 characters), descriptive of the research topic, use underscores instead of spaces, and only contain alphanumeric characters and underscores. Example: "WebResearch_Topic_Context"
Return ONLY valid JSON, no additional text:
{{
"instruction": "research instruction",
"urls": ["url1", "url2"],
"needsSearch": true,
"maxNumberPages": 10,
"country": "ch",
"language": "en",
"researchDepth": "general",
"filename": "descriptive_filename_without_extension"
}}"""
try:
# Call AI planning to analyze intent
analysisJson = await self.services.ai.callAiPlanning(
analysisPrompt,
debugType="webresearchintent"
)
# Extract JSON from response (handles markdown code blocks)
extractedJson = self.services.utils.jsonExtractString(analysisJson)
if not extractedJson:
raise ValueError("No JSON found in AI response")
# Parse JSON response
result = json.loads(extractedJson)
logger.info(f"Intent analysis result: {result}")
return result
except Exception as e:
logger.warning(f"Error in AI intent analysis: {str(e)}")
# Fallback to basic extraction
return {
"instruction": prompt,
"urls": [],
"needsSearch": True,
"maxNumberPages": 10,
"country": country,
"language": language,
"researchDepth": researchDepth,
"filename": None
}
async def _performWebSearch(
self,
instruction: str,
maxNumberPages: int,
country: Optional[str],
language: Optional[str]
) -> List[str]:
"""Perform web search to find URLs."""
try:
# Build search prompt model
searchPromptModel = AiCallPromptWebSearch(
instruction=instruction,
country=country,
maxNumberPages=maxNumberPages,
language=language
)
searchPrompt = searchPromptModel.model_dump_json(exclude_none=True, indent=2)
# Debug: persist search prompt
self.services.utils.writeDebugFile(searchPrompt, "websearch_prompt")
# Call AI with WEB_SEARCH operation
searchOptions = AiCallOptions(
operationType=OperationTypeEnum.WEB_SEARCH,
resultFormat="json"
)
searchResult = await self.services.ai.callAiDocuments(
prompt=searchPrompt,
documents=None,
options=searchOptions,
outputFormat="json"
)
# Debug: persist search response
if isinstance(searchResult, str):
self.services.utils.writeDebugFile(searchResult, "websearch_response")
else:
self.services.utils.writeDebugFile(json.dumps(searchResult, indent=2), "websearch_response")
# Parse and extract URLs
if isinstance(searchResult, str):
# Extract JSON from response (handles markdown code blocks)
extractedJson = self.services.utils.jsonExtractString(searchResult)
searchData = json.loads(extractedJson) if extractedJson else json.loads(searchResult)
else:
searchData = searchResult
# Extract URLs from response
urls = []
if isinstance(searchData, dict):
if "urls" in searchData:
urls = searchData["urls"]
elif "results" in searchData:
urls = [r.get("url") for r in searchData["results"] if r.get("url")]
elif isinstance(searchData, list):
# Handle both cases: list of URL strings or list of dicts with "url" key
for item in searchData:
if isinstance(item, str):
# Item is already a URL string
urls.append(item)
elif isinstance(item, dict) and item.get("url"):
# Item is a dict with "url" key
urls.append(item.get("url"))
logger.info(f"Web search returned {len(urls)} URLs")
return urls
except Exception as e:
logger.error(f"Error in web search: {str(e)}")
return []
async def _performWebCrawl(
self,
instruction: str,
urls: List[str],
maxDepth: int = 2
) -> List[Dict[str, Any]]:
"""Perform web crawl on list of URLs - calls plugin for each URL individually."""
crawlResults = []
# Loop over each URL and crawl one at a time
for url in urls:
try:
logger.info(f"Crawling URL: {url}")
# Build crawl prompt model for single URL
crawlPromptModel = AiCallPromptWebCrawl(
instruction=instruction,
url=url, # Single URL
maxDepth=maxDepth,
maxWidth=50
)
crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2)
# Debug: persist crawl prompt (with URL identifier in content for clarity)
debugPrompt = f"URL: {url}\n\n{crawlPrompt}"
self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt")
# Call AI with WEB_CRAWL operation
crawlOptions = AiCallOptions(
operationType=OperationTypeEnum.WEB_CRAWL,
resultFormat="json"
)
crawlResult = await self.services.ai.callAiDocuments(
prompt=crawlPrompt,
documents=None,
options=crawlOptions,
outputFormat="json"
)
# Debug: persist crawl response
if isinstance(crawlResult, str):
self.services.utils.writeDebugFile(crawlResult, "webcrawl_response")
else:
self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response")
# Parse crawl result
if isinstance(crawlResult, str):
try:
# Extract JSON from response (handles markdown code blocks)
extractedJson = self.services.utils.jsonExtractString(crawlResult)
crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult)
except:
crawlData = {"url": url, "content": crawlResult}
else:
crawlData = crawlResult
# Ensure it's a list of results
if isinstance(crawlData, list):
crawlResults.extend(crawlData)
elif isinstance(crawlData, dict):
if "results" in crawlData:
crawlResults.extend(crawlData["results"])
else:
crawlResults.append(crawlData)
else:
crawlResults.append({"url": url, "content": str(crawlData)})
except Exception as e:
logger.error(f"Error crawling URL {url}: {str(e)}")
crawlResults.append({"url": url, "error": str(e)})
return crawlResults