""" Web crawl service for handling web research operations. Manages the two-step process: WEB_SEARCH then WEB_CRAWL. """ import json import logging from typing import Dict, Any, List, Optional from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl logger = logging.getLogger(__name__) class WebService: """Service for web search and crawling operations.""" def __init__(self, services): """Initialize webcrawl service with service center access.""" self.services = services async def performWebResearch( self, prompt: str, urls: List[str], country: Optional[str], language: Optional[str], researchDepth: str = "general", operationId: str = None ) -> Dict[str, Any]: """ Perform web research in two steps: 1. Use AI to analyze prompt and extract parameters + URLs 2. Call WEB_SEARCH to get URLs (if needed) 3. Combine URLs and filter to maxNumberPages 4. Call WEB_CRAWL for each URL 5. Return consolidated result Args: prompt: Natural language research prompt urls: Optional list of URLs provided by user country: Optional country code language: Optional language code operationId: Operation ID for progress tracking Returns: Consolidated research results as dictionary """ try: # Step 1: AI intention analysis - extract URLs and parameters from prompt self.services.workflow.progressLogUpdate(operationId, 0.1, "Analyzing research intent") analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth) # Extract parameters from AI analysis instruction = analysisResult.get("instruction", prompt) extractedUrls = analysisResult.get("urls", []) needsSearch = analysisResult.get("needsSearch", True) # Default to True maxNumberPages = analysisResult.get("maxNumberPages", 10) countryCode = analysisResult.get("country", country) languageCode = analysisResult.get("language", language) finalResearchDepth = analysisResult.get("researchDepth", researchDepth) logger.info(f"AI Analysis: instruction='{instruction[:100]}...', urls={len(extractedUrls)}, needsSearch={needsSearch}, maxNumberPages={maxNumberPages}, researchDepth={finalResearchDepth}") # Combine URLs (from user + from prompt extraction) allUrls = [] if urls: allUrls.extend(urls) if extractedUrls: allUrls.extend(extractedUrls) # Step 2: Search for URLs if needed (based on needsSearch flag) if needsSearch and (not allUrls or len(allUrls) < maxNumberPages): self.services.workflow.progressLogUpdate(operationId, 0.3, "Searching for URLs") searchUrls = await self._performWebSearch( instruction=instruction, maxNumberPages=maxNumberPages - len(allUrls), country=countryCode, language=languageCode ) # Add search URLs to the list allUrls.extend(searchUrls) self.services.workflow.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") # Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering) if len(allUrls) > maxNumberPages: allUrls = allUrls[:maxNumberPages] logger.info(f"Limited URLs to {maxNumberPages}") if not allUrls: return {"error": "No URLs found to crawl"} # Step 4: Translate researchDepth to maxDepth depthMap = {"fast": 1, "general": 2, "deep": 3} maxDepth = depthMap.get(finalResearchDepth.lower(), 2) # Step 5: Crawl all URLs self.services.workflow.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") crawlResult = await self._performWebCrawl( instruction=instruction, urls=allUrls, maxDepth=maxDepth ) self.services.workflow.progressLogUpdate(operationId, 0.9, "Consolidating results") # Return consolidated result return { "instruction": instruction, "urls_crawled": allUrls, "total_urls": len(allUrls), "results": crawlResult, "total_results": len(crawlResult) if isinstance(crawlResult, list) else 1 } except Exception as e: logger.error(f"Error in web research: {str(e)}") raise async def _analyzeResearchIntent( self, prompt: str, urls: List[str], country: Optional[str], language: Optional[str], researchDepth: str = "general" ) -> Dict[str, Any]: """ Use AI to analyze prompt and extract: - URLs from the prompt text - Research instruction - maxNumberPages, timeRange, country, language from context """ # Build analysis prompt for AI analysisPrompt = f"""Analyze this web research request and extract structured information. RESEARCH REQUEST: {prompt} USER PROVIDED: - URLs: {json.dumps(urls) if urls else "None"} - Country: {country or "Not specified"} - Language: {language or "Not specified"} Extract and provide a JSON response with: 1. instruction: The core research instruction (cleaned prompt without URLs) 2. urls: List of URLs found in the prompt text 3. needsSearch: true if web search is needed to identify url's to crawl, false if only crawling of provided URLs is wanted 4. maxNumberPages: Recommended number of URLs to crawl (based on research scope, typical: 2-20) 5. country: Country code if identified in the prompt (2-digit lowercase, e.g., ch, us, de) 6. language: Language identified from the prompt (lowercase, e.g., de, en, fr) 7. researchDepth: Research depth based on instruction complexity - "fast" (quick overview, maxDepth=1), "general" (standard research, maxDepth=2), or "deep" (comprehensive research, maxDepth=3) Return ONLY valid JSON, no additional text: {{ "instruction": "cleaned research instruction", "urls": ["url1", "url2"], "needsSearch": true, "maxNumberPages": 10, "country": "ch", "language": "de", "researchDepth": "general" }}""" try: # Call AI planning to analyze intent analysisJson = await self.services.ai.callAiPlanning(analysisPrompt) # Parse JSON response result = json.loads(analysisJson) logger.info(f"Intent analysis result: {result}") return result except Exception as e: logger.warning(f"Error in AI intent analysis: {str(e)}") # Fallback to basic extraction return { "instruction": prompt, "urls": [], "needsSearch": True, "maxNumberPages": 10, "country": country, "language": language, "researchDepth": researchDepth } async def _performWebSearch( self, instruction: str, maxNumberPages: int, country: Optional[str], language: Optional[str] ) -> List[str]: """Perform web search to find URLs.""" try: # Build search prompt model searchPromptModel = AiCallPromptWebSearch( instruction=instruction, country=country, maxNumberPages=maxNumberPages, language=language ) searchPrompt = searchPromptModel.model_dump_json(exclude_none=True, indent=2) # Call AI with WEB_SEARCH operation searchOptions = AiCallOptions( operationType=OperationTypeEnum.WEB_SEARCH, resultFormat="json" ) searchResult = await self.services.ai.callAiDocuments( prompt=searchPrompt, documents=None, options=searchOptions, outputFormat="json" ) # Parse and extract URLs if isinstance(searchResult, str): searchData = json.loads(searchResult) else: searchData = searchResult # Extract URLs from response urls = [] if isinstance(searchData, dict): if "urls" in searchData: urls = searchData["urls"] elif "results" in searchData: urls = [r.get("url") for r in searchData["results"] if r.get("url")] elif isinstance(searchData, list): urls = [item.get("url") for item in searchData if item.get("url")] logger.info(f"Web search returned {len(urls)} URLs") return urls except Exception as e: logger.error(f"Error in web search: {str(e)}") return [] async def _performWebCrawl( self, instruction: str, urls: List[str], maxDepth: int = 2 ) -> List[Dict[str, Any]]: """Perform web crawl on list of URLs - calls plugin for each URL individually.""" crawlResults = [] # Loop over each URL and crawl one at a time for url in urls: try: logger.info(f"Crawling URL: {url}") # Build crawl prompt model for single URL crawlPromptModel = AiCallPromptWebCrawl( instruction=instruction, url=url, # Single URL maxDepth=maxDepth, maxWidth=50 ) crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2) # Call AI with WEB_CRAWL operation crawlOptions = AiCallOptions( operationType=OperationTypeEnum.WEB_CRAWL, resultFormat="json" ) crawlResult = await self.services.ai.callAiDocuments( prompt=crawlPrompt, documents=None, options=crawlOptions, outputFormat="json" ) # Parse crawl result if isinstance(crawlResult, str): try: crawlData = json.loads(crawlResult) except: crawlData = {"url": url, "content": crawlResult} else: crawlData = crawlResult # Ensure it's a list of results if isinstance(crawlData, list): crawlResults.extend(crawlData) elif isinstance(crawlData, dict): if "results" in crawlData: crawlResults.extend(crawlData["results"]) else: crawlResults.append(crawlData) else: crawlResults.append({"url": url, "content": str(crawlData)}) except Exception as e: logger.error(f"Error crawling URL {url}: {str(e)}") crawlResults.append({"url": url, "error": str(e)}) return crawlResults