""" Web crawl service for handling web research operations. Manages the two-step process: WEB_SEARCH then WEB_CRAWL. """ import json import logging from typing import Dict, Any, List, Optional from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl logger = logging.getLogger(__name__) class WebService: """Service for web search and crawling operations.""" def __init__(self, services): """Initialize webcrawl service with service center access.""" self.services = services async def performWebResearch( self, prompt: str, urls: List[str], country: Optional[str], language: Optional[str], researchDepth: str = "general", operationId: str = None ) -> Dict[str, Any]: """ Perform web research in two steps: 1. Use AI to analyze prompt and extract parameters + URLs 2. Call WEB_SEARCH to get URLs (if needed) 3. Combine URLs and filter to maxNumberPages 4. Call WEB_CRAWL for each URL 5. Return consolidated result Args: prompt: Natural language research prompt urls: Optional list of URLs provided by user country: Optional country code language: Optional language code operationId: Operation ID for progress tracking Returns: Consolidated research results as dictionary """ try: # Step 1: AI intention analysis - extract URLs and parameters from prompt self.services.chat.progressLogUpdate(operationId, 0.1, "Analyzing research intent") analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth) # Extract parameters from AI analysis instruction = analysisResult.get("instruction", prompt) extractedUrls = analysisResult.get("urls", []) needsSearch = analysisResult.get("needsSearch", True) # Default to True maxNumberPages = analysisResult.get("maxNumberPages", 10) countryCode = analysisResult.get("country", country) languageCode = analysisResult.get("language", language) finalResearchDepth = analysisResult.get("researchDepth", researchDepth) suggestedFilename = analysisResult.get("filename", None) logger.info(f"AI Analysis: instruction='{instruction[:100]}...', urls={len(extractedUrls)}, needsSearch={needsSearch}, maxNumberPages={maxNumberPages}, researchDepth={finalResearchDepth}, filename={suggestedFilename}") # Combine URLs (from user + from prompt extraction) allUrls = [] if urls: allUrls.extend(urls) if extractedUrls: allUrls.extend(extractedUrls) # Step 2: Search for URLs if needed (based on needsSearch flag) if needsSearch and (not allUrls or len(allUrls) < maxNumberPages): self.services.chat.progressLogUpdate(operationId, 0.3, "Searching for URLs") searchUrls = await self._performWebSearch( instruction=instruction, maxNumberPages=maxNumberPages - len(allUrls), country=countryCode, language=languageCode ) # Add search URLs to the list allUrls.extend(searchUrls) self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") # Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering) if len(allUrls) > maxNumberPages: allUrls = allUrls[:maxNumberPages] logger.info(f"Limited URLs to {maxNumberPages}") if not allUrls: return {"error": "No URLs found to crawl"} # Step 4: Translate researchDepth to maxDepth depthMap = {"fast": 1, "general": 2, "deep": 3} maxDepth = depthMap.get(finalResearchDepth.lower(), 2) # Step 5: Crawl all URLs self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") crawlResult = await self._performWebCrawl( instruction=instruction, urls=allUrls, maxDepth=maxDepth ) self.services.chat.progressLogUpdate(operationId, 0.9, "Consolidating results") # Return consolidated result result = { "instruction": instruction, "urls_crawled": allUrls, "total_urls": len(allUrls), "results": crawlResult, "total_results": len(crawlResult) if isinstance(crawlResult, list) else 1 } # Add suggested filename if available if suggestedFilename: result["suggested_filename"] = suggestedFilename return result except Exception as e: logger.error(f"Error in web research: {str(e)}") raise async def _analyzeResearchIntent( self, prompt: str, urls: List[str], country: Optional[str], language: Optional[str], researchDepth: str = "general" ) -> Dict[str, Any]: """ Use AI to analyze prompt and extract: - URLs from the prompt text - Research instruction - maxNumberPages, timeRange, country, language from context """ # Build analysis prompt for AI analysisPrompt = f"""Analyze this web research request and extract structured information. RESEARCH REQUEST: {prompt} USER PROVIDED: - URLs: {json.dumps(urls) if urls else "None"} - Country: {country or "Not specified"} - Language: {language or "Not specified"} Extract and provide a JSON response with: 1. instruction: Formulate directly, WHAT you want to find on the web. Do not include URLs in the instruction. Good example: "What is the company Xyz doing?". Bad example: "Conduct web research on the company Xyz" 2. urls: Put list of URLs found in the prompt text, and URL's you know, that are relevant to the research 3. needsSearch: true if web search is needed to identify url's to crawl, false if only crawling of provided URLs is wanted 4. maxNumberPages: Recommended number of URLs to crawl (based on research scope, typical: 2-20) 5. country: Country code if identified in the prompt (2-digit lowercase, e.g., ch, us, de) 6. language: Language identified from the prompt (lowercase, e.g., de, en, fr) 7. researchDepth: Research depth based on instruction complexity - "fast" (quick overview, maxDepth=1), "general" (standard research, maxDepth=2), or "deep" (comprehensive research, maxDepth=3) 8. filename: Generate a concise, descriptive filename (without extension) for the research results. Should be short (max 50 characters), descriptive of the research topic, use underscores instead of spaces, and only contain alphanumeric characters and underscores. Example: "WebResearch_Topic_Context" Return ONLY valid JSON, no additional text: {{ "instruction": "research instruction", "urls": ["url1", "url2"], "needsSearch": true, "maxNumberPages": 10, "country": "ch", "language": "en", "researchDepth": "general", "filename": "descriptive_filename_without_extension" }}""" try: # Call AI planning to analyze intent analysisJson = await self.services.ai.callAiPlanning( analysisPrompt, debugType="webresearchintent" ) # Extract JSON from response (handles markdown code blocks) extractedJson = self.services.utils.jsonExtractString(analysisJson) if not extractedJson: raise ValueError("No JSON found in AI response") # Parse JSON response result = json.loads(extractedJson) logger.info(f"Intent analysis result: {result}") return result except Exception as e: logger.warning(f"Error in AI intent analysis: {str(e)}") # Fallback to basic extraction return { "instruction": prompt, "urls": [], "needsSearch": True, "maxNumberPages": 10, "country": country, "language": language, "researchDepth": researchDepth, "filename": None } async def _performWebSearch( self, instruction: str, maxNumberPages: int, country: Optional[str], language: Optional[str] ) -> List[str]: """Perform web search to find URLs.""" try: # Build search prompt model searchPromptModel = AiCallPromptWebSearch( instruction=instruction, country=country, maxNumberPages=maxNumberPages, language=language ) searchPrompt = searchPromptModel.model_dump_json(exclude_none=True, indent=2) # Debug: persist search prompt self.services.utils.writeDebugFile(searchPrompt, "websearch_prompt") # Call AI with WEB_SEARCH operation searchOptions = AiCallOptions( operationType=OperationTypeEnum.WEB_SEARCH, resultFormat="json" ) # Use unified callAiContent method searchResponse = await self.services.ai.callAiContent( prompt=searchPrompt, options=searchOptions, outputFormat="json" ) # Extract content from AiResponse searchResult = searchResponse.content # Debug: persist search response if isinstance(searchResult, str): self.services.utils.writeDebugFile(searchResult, "websearch_response") else: self.services.utils.writeDebugFile(json.dumps(searchResult, indent=2), "websearch_response") # Parse and extract URLs if isinstance(searchResult, str): # Extract JSON from response (handles markdown code blocks) extractedJson = self.services.utils.jsonExtractString(searchResult) searchData = json.loads(extractedJson) if extractedJson else json.loads(searchResult) else: searchData = searchResult # Extract URLs from response urls = [] if isinstance(searchData, dict): if "urls" in searchData: urls = searchData["urls"] elif "results" in searchData: urls = [r.get("url") for r in searchData["results"] if r.get("url")] elif isinstance(searchData, list): # Handle both cases: list of URL strings or list of dicts with "url" key for item in searchData: if isinstance(item, str): # Item is already a URL string urls.append(item) elif isinstance(item, dict) and item.get("url"): # Item is a dict with "url" key urls.append(item.get("url")) logger.info(f"Web search returned {len(urls)} URLs") return urls except Exception as e: logger.error(f"Error in web search: {str(e)}") return [] async def _performWebCrawl( self, instruction: str, urls: List[str], maxDepth: int = 2 ) -> List[Dict[str, Any]]: """Perform web crawl on list of URLs - calls plugin for each URL individually.""" crawlResults = [] # Loop over each URL and crawl one at a time for url in urls: try: logger.info(f"Crawling URL: {url}") # Build crawl prompt model for single URL crawlPromptModel = AiCallPromptWebCrawl( instruction=instruction, url=url, # Single URL maxDepth=maxDepth, maxWidth=50 ) crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2) # Debug: persist crawl prompt (with URL identifier in content for clarity) debugPrompt = f"URL: {url}\n\n{crawlPrompt}" self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt") # Call AI with WEB_CRAWL operation crawlOptions = AiCallOptions( operationType=OperationTypeEnum.WEB_CRAWL, resultFormat="json" ) # Use unified callAiContent method crawlResponse = await self.services.ai.callAiContent( prompt=crawlPrompt, options=crawlOptions, outputFormat="json" ) # Extract content from AiResponse crawlResult = crawlResponse.content # Debug: persist crawl response if isinstance(crawlResult, str): self.services.utils.writeDebugFile(crawlResult, "webcrawl_response") else: self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response") # Parse crawl result if isinstance(crawlResult, str): try: # Extract JSON from response (handles markdown code blocks) extractedJson = self.services.utils.jsonExtractString(crawlResult) crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult) except: crawlData = {"url": url, "content": crawlResult} else: crawlData = crawlResult # Ensure it's a list of results if isinstance(crawlData, list): crawlResults.extend(crawlData) elif isinstance(crawlData, dict): if "results" in crawlData: crawlResults.extend(crawlData["results"]) else: crawlResults.append(crawlData) else: crawlResults.append({"url": url, "content": str(crawlData)}) except Exception as e: logger.error(f"Error crawling URL {url}: {str(e)}") crawlResults.append({"url": url, "error": str(e)}) return crawlResults