# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Web crawl service for handling web research operations. Manages the two-step process: WEB_SEARCH_DATA then WEB_CRAWL. """ import json import logging import time import asyncio from urllib.parse import urlparse from typing import Dict, Any, List, Optional from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl logger = logging.getLogger(__name__) class WebService: """Service for web search and crawling operations.""" def __init__(self, services): """Initialize webcrawl service with service center access.""" self.services = services async def performWebResearch( self, prompt: str, urls: List[str], country: Optional[str], language: Optional[str], researchDepth: str = "general", operationId: str = None ) -> Dict[str, Any]: """ Perform web research in two steps: 1. Use AI to analyze prompt and extract parameters + URLs 2. Call WEB_SEARCH_DATA to get URLs (if needed) 3. Combine URLs and filter to maxNumberPages 4. Call WEB_CRAWL for each URL 5. Return consolidated result Args: prompt: Natural language research prompt urls: Optional list of URLs provided by user country: Optional country code language: Optional language code operationId: Operation ID for progress tracking Returns: Consolidated research results as dictionary """ # Start progress tracking if operationId provided if operationId: self.services.chat.progressLogStart( operationId, "Web Research", "Research", f"Depth: {researchDepth}" ) try: # Step 1: AI intention analysis - extract URLs and parameters from prompt if operationId: self.services.chat.progressLogUpdate(operationId, 0.1, "Analyzing research intent") analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth) # Extract parameters from AI analysis instruction = analysisResult.get("instruction", prompt) extractedUrls = analysisResult.get("urls", []) needsSearch = analysisResult.get("needsSearch", True) # Default to True maxNumberPages = analysisResult.get("maxNumberPages", 10) countryCode = analysisResult.get("country", country) languageCode = analysisResult.get("language", language) finalResearchDepth = analysisResult.get("researchDepth", researchDepth) suggestedFilename = analysisResult.get("filename", None) logger.info(f"AI Analysis: instruction='{instruction[:100]}...', urls={len(extractedUrls)}, needsSearch={needsSearch}, maxNumberPages={maxNumberPages}, researchDepth={finalResearchDepth}, filename={suggestedFilename}") # Combine URLs (from user + from prompt extraction) allUrls = [] if urls: allUrls.extend(urls) if extractedUrls: allUrls.extend(extractedUrls) # Step 2: Search for URLs and content if needed (based on needsSearch flag) searchUrls = [] searchResultsWithContent = [] if needsSearch and (not allUrls or len(allUrls) < maxNumberPages): self.services.chat.progressLogUpdate(operationId, 0.3, "Searching for URLs and content") try: searchUrls, searchResultsWithContent = await self._performWebSearch( instruction=instruction, maxNumberPages=maxNumberPages - len(allUrls), country=countryCode, language=languageCode ) logger.info(f"Tavily search returned {len(searchUrls)} URLs with {len(searchResultsWithContent)} results containing content") except Exception as e: logger.error(f"Error performing Tavily search (continuing with other URLs): {str(e)}", exc_info=True) searchUrls = [] searchResultsWithContent = [] # Prioritize Tavily search URLs over AI-extracted URLs (they're more relevant) if searchUrls: # Prepend Tavily URLs to the list (they're more relevant) allUrls = searchUrls + allUrls logger.info(f"Using {len(searchUrls)} Tavily URLs + {len(allUrls) - len(searchUrls)} other URLs = {len(allUrls)} total") else: # If Tavily search failed, use AI-extracted URLs logger.warning("Tavily search returned no URLs, using AI-extracted URLs only") self.services.chat.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") # If we have search results (even without content), use them directly instead of crawling # Tavily search results are more relevant than generic AI-extracted URLs # Only crawl if we have NO search results at all if searchResultsWithContent and len(searchResultsWithContent) > 0: urls_with_actual_content = sum(1 for r in searchResultsWithContent if r.get("content") and len(r.get("content", "")) > 0) logger.info(f"Using {len(searchResultsWithContent)} Tavily search results ({urls_with_actual_content} with content) directly (skipping crawl)") # Convert search results to crawl result format crawlResult = [] for result in searchResultsWithContent: crawlResult.append({ "url": result["url"], "title": result.get("title", ""), "content": result.get("content", "") }) # Calculate statistics totalResults = len(crawlResult) totalContentLength = sum(len(r.get("content", "")) for r in crawlResult) urlsWithContent = sum(1 for r in crawlResult if r.get("content") and len(r.get("content", "")) > 0) # Log content availability if urlsWithContent == 0: logger.warning(f"Tavily search returned {len(searchResultsWithContent)} results but none have content - URLs will be used but may need crawling") else: logger.info(f"Tavily search provided content for {urlsWithContent}/{len(searchResultsWithContent)} URLs") # Even if content is empty, use these results - they're more relevant than generic URLs # The final answer generation can work with URLs even if content is empty # Convert to sections format sections = [] for idx, item in enumerate(crawlResult): section = { "id": f"result_{idx}", "content_type": "paragraph", "title": item.get("title") or item.get("url", f"Result {idx + 1}"), "order": idx } content = item.get("content", "") if content: section["textPreview"] = content[:200] + ("..." if len(content) > 200 else "") sections.append(section) # Return consolidated result result = { "metadata": { "title": suggestedFilename or instruction[:100] if instruction else "Web Research Results", "extraction_method": "tavily_search_direct", "research_depth": finalResearchDepth, "country": countryCode, "language": languageCode, "urls_searched": searchUrls[:20], "total_urls": len(searchUrls), "urls_with_content": urlsWithContent, "total_content_length": totalContentLength, "search_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None }, "sections": sections, "statistics": { "sectionCount": len(sections), "total_urls": len(searchUrls), "results_count": totalResults, "urls_with_content": urlsWithContent, "total_content_length": totalContentLength }, "instruction": instruction, "urls_crawled": searchUrls, "total_urls": len(searchUrls), "results": crawlResult, "total_results": totalResults } if suggestedFilename: result["suggested_filename"] = suggestedFilename result["metadata"]["suggested_filename"] = suggestedFilename if operationId: self.services.chat.progressLogUpdate(operationId, 0.9, "Completed") self.services.chat.progressLogFinish(operationId, True) return result # Step 3: Validate and filter URLs before crawling validatedUrls = self._validateUrls(allUrls) if not validatedUrls: logger.warning(f"All {len(allUrls)} URLs failed validation") return {"error": "No valid URLs found to crawl"} # Filter to maxNumberPages (simple cut, no intelligent filtering) if len(validatedUrls) > maxNumberPages: validatedUrls = validatedUrls[:maxNumberPages] logger.info(f"Limited URLs to {maxNumberPages}") if not validatedUrls: return {"error": "No URLs found to crawl"} # Step 4: Translate researchDepth to maxDepth and maxWidth depthMap = {"fast": 1, "general": 2, "deep": 3} maxDepth = depthMap.get(finalResearchDepth.lower(), 2) # Scale maxWidth based on research depth: fast=5, general=10, deep=20 pages per level widthMap = {"fast": 5, "general": 10, "deep": 20} maxWidth = widthMap.get(finalResearchDepth.lower(), 10) logger.info(f"Research depth settings: depth={finalResearchDepth}, maxDepth={maxDepth}, maxWidth={maxWidth}") # Step 5: Crawl all URLs with hierarchical logging if operationId: self.services.chat.progressLogUpdate(operationId, 0.4, "Initiating") self.services.chat.progressLogUpdate(operationId, 0.6, f"Crawling {len(validatedUrls)} URLs") # Use parent operation ID directly (parentId should be operationId, not log entry ID) parentOperationId = operationId # Use the parent's operationId directly crawlResult = await self._performWebCrawl( instruction=instruction, urls=validatedUrls, maxDepth=maxDepth, maxWidth=maxWidth, # Pass maxWidth to crawl function parentOperationId=parentOperationId ) if operationId: self.services.chat.progressLogUpdate(operationId, 0.9, "Consolidating results") self.services.chat.progressLogUpdate(operationId, 0.95, "Completed") self.services.chat.progressLogFinish(operationId, True) # Calculate statistics about crawl results totalResults = len(crawlResult) if isinstance(crawlResult, list) else 1 totalContentLength = 0 urlsWithContent = 0 # Analyze crawl results to gather statistics if isinstance(crawlResult, list): for item in crawlResult: if isinstance(item, dict): if item.get("url"): urlsWithContent += 1 content = item.get("content", "") if isinstance(content, str): totalContentLength += len(content) elif isinstance(content, dict): # Estimate size from dict totalContentLength += len(str(content)) elif isinstance(crawlResult, dict): if crawlResult.get("url"): urlsWithContent = 1 content = crawlResult.get("content", "") if isinstance(content, str): totalContentLength = len(content) elif isinstance(content, dict): totalContentLength = len(str(content)) # Convert crawl results into sections format for generic validator sections = [] if isinstance(crawlResult, list): for idx, item in enumerate(crawlResult): if isinstance(item, dict): section = { "id": f"result_{idx}", "content_type": "paragraph", "title": item.get("url", f"Result {idx + 1}"), "order": idx } # Add content preview content = item.get("content", "") if isinstance(content, str) and content: section["textPreview"] = content[:200] + ("..." if len(content) > 200 else "") sections.append(section) elif isinstance(crawlResult, dict): section = { "id": "result_0", "content_type": "paragraph", "title": crawlResult.get("url", "Research Result"), "order": 0 } content = crawlResult.get("content", "") if isinstance(content, str) and content: section["textPreview"] = content[:200] + ("..." if len(content) > 200 else "") sections.append(section) # Return consolidated result with metadata in format that generic validator understands result = { "metadata": { "title": suggestedFilename or instruction[:100] if instruction else "Web Research Results", "extraction_method": "web_crawl", "research_depth": finalResearchDepth, "max_depth": maxDepth, "country": countryCode, "language": languageCode, "urls_crawled": validatedUrls[:20], # First 20 URLs for reference "total_urls": len(validatedUrls), "urls_with_content": urlsWithContent, "total_content_length": totalContentLength, "crawl_date": self.services.utils.timestampGetUtc() if hasattr(self.services, 'utils') else None }, "sections": sections, "statistics": { "sectionCount": len(sections), "total_urls": len(validatedUrls), "results_count": totalResults, "urls_with_content": urlsWithContent, "total_content_length": totalContentLength }, # Keep original structure for backward compatibility "instruction": instruction, "urls_crawled": validatedUrls, "total_urls": len(validatedUrls), "results": crawlResult, "total_results": totalResults } # Add suggested filename if available if suggestedFilename: result["suggested_filename"] = suggestedFilename result["metadata"]["suggested_filename"] = suggestedFilename return result except Exception as e: logger.error(f"Error in web research: {str(e)}") if operationId: self.services.chat.progressLogFinish(operationId, False) raise async def _analyzeResearchIntent( self, prompt: str, urls: List[str], country: Optional[str], language: Optional[str], researchDepth: str = "general" ) -> Dict[str, Any]: """ Use AI to analyze prompt and extract: - URLs from the prompt text - Research instruction - maxNumberPages, timeRange, country, language from context """ # Build analysis prompt for AI analysisPrompt = f"""Analyze this web research request and extract structured information. RESEARCH REQUEST: {prompt} USER PROVIDED: - URLs: {json.dumps(urls) if urls else "None"} - Country: {country or "Not specified"} - Language: {language or "Not specified"} Extract and provide a JSON response with: 1. instruction: Formulate directly, WHAT you want to find on the web. Do not include URLs in the instruction. Good example: "What is the company Xyz doing?". Bad example: "Conduct web research on the company Xyz" 2. urls: Put list of URLs found in the prompt text, and URL's you know, that are relevant to the research 3. needsSearch: true if web search is needed to identify url's to crawl, false if only crawling of provided URLs is wanted 4. maxNumberPages: Recommended number of URLs to crawl (based on research scope, typical: 2-20) 5. country: Country code if identified in the prompt (2-digit lowercase, e.g., ch, us, de) 6. language: Language identified from the prompt (lowercase, e.g., de, en, fr) 7. researchDepth: Research depth based on instruction complexity - "fast" (quick overview, maxDepth=1), "general" (standard research, maxDepth=2), or "deep" (comprehensive research, maxDepth=3) 8. filename: Generate a concise, descriptive filename (without extension) for the research results. Should be short (max 50 characters), descriptive of the research topic, use underscores instead of spaces, and only contain alphanumeric characters and underscores. Example: "WebResearch_Topic_Context" Return ONLY valid JSON, no additional text: {{ "instruction": "research instruction", "urls": ["url1", "url2"], "needsSearch": true, "maxNumberPages": 10, "country": "ch", "language": "en", "researchDepth": "general", "filename": "descriptive_filename_without_extension" }}""" try: # Call AI planning to analyze intent analysisJson = await self.services.ai.callAiPlanning( analysisPrompt, debugType="webresearchintent" ) # Extract JSON from response (handles markdown code blocks) extractedJson = self.services.utils.jsonExtractString(analysisJson) if not extractedJson: raise ValueError("No JSON found in AI response") # Parse JSON response result = json.loads(extractedJson) logger.info(f"Intent analysis result: {result}") return result except Exception as e: logger.warning(f"Error in AI intent analysis: {str(e)}") # Fallback to basic extraction return { "instruction": prompt, "urls": [], "needsSearch": True, "maxNumberPages": 10, "country": country, "language": language, "researchDepth": researchDepth, "filename": None } async def _performWebSearch( self, instruction: str, maxNumberPages: int, country: Optional[str], language: Optional[str] ) -> tuple[List[str], List[Dict[str, Any]]]: """ Perform web search to find URLs and content. Returns: Tuple of (urls, search_results_with_content) - urls: List of URL strings - search_results_with_content: List of dicts with url, title, content from Tavily search """ search_results_with_content = [] try: # Build search prompt model searchPromptModel = AiCallPromptWebSearch( instruction=instruction, country=country, maxNumberPages=maxNumberPages, language=language ) searchPrompt = searchPromptModel.model_dump_json(exclude_none=True, indent=2) # Debug: persist search prompt self.services.utils.writeDebugFile(searchPrompt, "websearch_prompt") # Call AI with WEB_SEARCH_DATA operation searchOptions = AiCallOptions( operationType=OperationTypeEnum.WEB_SEARCH_DATA, resultFormat="json" ) # Use unified callAiContent method searchResponse = await self.services.ai.callAiContent( prompt=searchPrompt, options=searchOptions, outputFormat="json" ) # Check if metadata contains results with content (from Tavily) if hasattr(searchResponse, 'metadata') and searchResponse.metadata: # Check in additionalData first (where we store custom metadata) additional_data = None if hasattr(searchResponse.metadata, 'additionalData') and searchResponse.metadata.additionalData: additional_data = searchResponse.metadata.additionalData elif isinstance(searchResponse.metadata, dict): additional_data = searchResponse.metadata.get("additionalData", {}) if additional_data: results_with_content = additional_data.get("results_with_content", []) if results_with_content: logger.info(f"Found {len(results_with_content)} search results with content in metadata.additionalData") # Extract URLs and content from metadata for result in results_with_content: if result.get("url"): search_results_with_content.append({ "url": result.get("url"), "title": result.get("title", ""), "content": result.get("content", ""), "score": result.get("score", 0) }) # Also check directly in metadata (fallback) if not search_results_with_content: results_with_content = None if hasattr(searchResponse.metadata, 'results_with_content'): results_with_content = searchResponse.metadata.results_with_content elif isinstance(searchResponse.metadata, dict): results_with_content = searchResponse.metadata.get("results_with_content", []) if results_with_content: logger.info(f"Found {len(results_with_content)} search results with content in metadata (direct)") for result in results_with_content: if result.get("url"): search_results_with_content.append({ "url": result.get("url"), "title": result.get("title", ""), "content": result.get("content", ""), "score": result.get("score", 0) }) # Extract content from AiResponse searchResult = searchResponse.content logger.debug(f"Search response content type: {type(searchResult)}, length: {len(str(searchResult)) if searchResult else 0}") # Debug: persist search response if isinstance(searchResult, str): self.services.utils.writeDebugFile(searchResult, "websearch_response") logger.debug(f"Search response (first 500 chars): {searchResult[:500]}") else: self.services.utils.writeDebugFile(json.dumps(searchResult, indent=2), "websearch_response") logger.debug(f"Search response type: {type(searchResult)}, keys: {list(searchResult.keys()) if isinstance(searchResult, dict) else 'N/A'}") # Parse and extract URLs and content if isinstance(searchResult, str): # Extract JSON from response (handles markdown code blocks) extractedJson = self.services.utils.jsonExtractString(searchResult) if extractedJson: try: searchData = json.loads(extractedJson) logger.debug(f"Parsed JSON from extracted string, type: {type(searchData)}") except json.JSONDecodeError as e: logger.warning(f"Failed to parse extracted JSON: {e}, trying direct parse") searchData = json.loads(searchResult) else: try: searchData = json.loads(searchResult) logger.debug(f"Parsed JSON directly from string, type: {type(searchData)}") except json.JSONDecodeError as e: logger.error(f"Failed to parse search result as JSON: {e}") searchData = {} else: searchData = searchResult logger.debug(f"Using searchResult directly as searchData, type: {type(searchData)}") logger.debug(f"Final searchData type: {type(searchData)}, is dict: {isinstance(searchData, dict)}, keys: {list(searchData.keys()) if isinstance(searchData, dict) else 'N/A'}") # Extract URLs and content from response urls = [] if isinstance(searchData, dict): # Check for new format: {"urls": [...], "results": [...]} if "urls" in searchData and "results" in searchData: urls = searchData["urls"] # Extract results with content for r in searchData["results"]: if r.get("url"): # Only add if not already added from metadata if not any(sr["url"] == r.get("url") for sr in search_results_with_content): search_results_with_content.append({ "url": r.get("url"), "title": r.get("title", ""), "content": r.get("content", ""), "score": r.get("score", 0) }) logger.debug(f"Extracted {len(urls)} URLs and {len(search_results_with_content)} results with content from new format") elif "urls" in searchData: urls = searchData["urls"] logger.debug(f"Extracted {len(urls)} URLs from 'urls' field") elif "results" in searchData: # Extract URLs from results (content already in search_results_with_content if from metadata) for r in searchData["results"]: if r.get("url"): urls.append(r.get("url")) # Only add to search_results_with_content if not already added from metadata if not any(sr["url"] == r.get("url") for sr in search_results_with_content): search_results_with_content.append({ "url": r.get("url"), "title": r.get("title", ""), "content": r.get("raw_content") or r.get("content", ""), "score": r.get("score", 0) }) logger.debug(f"Extracted {len(urls)} URLs with content from 'results' field") else: logger.warning(f"Unexpected search data structure (dict): {list(searchData.keys())}") elif isinstance(searchData, list): # Handle both cases: list of URL strings or list of dicts with "url" key for item in searchData: if isinstance(item, str): # Item is already a URL string urls.append(item) elif isinstance(item, dict): if item.get("url"): urls.append(item.get("url")) # Only add to search_results_with_content if not already added from metadata if not any(sr["url"] == item.get("url") for sr in search_results_with_content): search_results_with_content.append({ "url": item.get("url"), "title": item.get("title", ""), "content": item.get("raw_content") or item.get("content", ""), "score": item.get("score", 0) }) logger.debug(f"Extracted {len(urls)} URLs from list") else: logger.warning(f"Unexpected search data type: {type(searchData)}") # If we got URLs but no content from metadata, extract URLs from search_results_with_content if urls and not search_results_with_content: # URLs were extracted but no content - create entries with empty content for url in urls: search_results_with_content.append({ "url": url, "title": "", "content": "", "score": 0 }) elif search_results_with_content and not urls: # We have content but no URLs - extract URLs from content results urls = [r["url"] for r in search_results_with_content] # If we have URLs but no search_results_with_content, create entries from URLs # This ensures we can use Tavily URLs even if content extraction failed if urls and not search_results_with_content: logger.warning("Got URLs from Tavily search but no content extracted - creating entries for direct use") for url in urls: search_results_with_content.append({ "url": url, "title": "", "content": "", # Empty content - will need crawling if used "score": 0 }) logger.info(f"Web search returned {len(urls)} URLs with {len(search_results_with_content)} results") if search_results_with_content: content_count = sum(1 for r in search_results_with_content if r.get("content") and len(r.get("content", "")) > 0) logger.info(f" - {content_count} results have content, {len(search_results_with_content) - content_count} without content") if content_count > 0: first_with_content = next((r for r in search_results_with_content if r.get("content")), None) if first_with_content: logger.info(f"Content preview from first result with content: {first_with_content.get('content', '')[:200]}") else: logger.warning("No search results extracted - will need to crawl URLs") return urls, search_results_with_content except Exception as e: logger.error(f"Error in web search: {str(e)}", exc_info=True) # Even if there's an error, try to extract URLs from the response if available recovered_urls = [] recovered_results = [] try: if 'searchResponse' in locals() and searchResponse: logger.info(f"Attempting to extract URLs from error response: {type(searchResponse)}") # Try to get content from response if hasattr(searchResponse, 'content'): errorContent = searchResponse.content if isinstance(errorContent, str): # Try to parse as JSON try: errorData = json.loads(errorContent) if isinstance(errorData, dict): if "urls" in errorData: recovered_urls = errorData["urls"] elif "results" in errorData: recovered_urls = [r.get("url") for r in errorData["results"] if r.get("url")] recovered_results = [{"url": r.get("url"), "title": r.get("title", ""), "content": r.get("content", ""), "score": 0} for r in errorData["results"]] elif isinstance(errorData, list): recovered_urls = [item if isinstance(item, str) else item.get("url", "") for item in errorData if item] if recovered_urls: logger.info(f"Recovered {len(recovered_urls)} URLs from error response") # Create entries for recovered URLs if not recovered_results: for url in recovered_urls: recovered_results.append({"url": url, "title": "", "content": "", "score": 0}) return recovered_urls, recovered_results except Exception as parseError: logger.debug(f"Failed to parse error response: {parseError}") except Exception as recoverError: logger.debug(f"Failed to recover URLs from error: {recoverError}") # If we have URLs from earlier extraction, return them if 'urls' in locals() and urls: logger.info(f"Returning {len(urls)} URLs extracted before error occurred") # Create entries from URLs results_from_urls = [{"url": url, "title": "", "content": "", "score": 0} for url in urls] return urls, results_from_urls return [], [] def _validateUrls(self, urls: List[str]) -> List[str]: """ Validate URLs before crawling - filters out invalid URLs. Args: urls: List of URLs to validate Returns: List of valid URLs """ validatedUrls = [] for url in urls: if not url or not isinstance(url, str): logger.debug(f"Skipping invalid URL (not a string): {url}") continue url = url.strip() if not url: logger.debug(f"Skipping empty URL") continue # Basic URL validation using urlparse try: parsed = urlparse(url) # Check if URL has at least scheme and netloc if not parsed.scheme or not parsed.netloc: logger.debug(f"Skipping invalid URL (missing scheme or netloc): {url}") continue # Only allow http/https schemes if parsed.scheme not in ['http', 'https']: logger.debug(f"Skipping URL with unsupported scheme '{parsed.scheme}': {url}") continue validatedUrls.append(url) logger.debug(f"Validated URL: {url}") except Exception as e: logger.warning(f"Error validating URL '{url}': {str(e)}") continue logger.info(f"Validated {len(validatedUrls)}/{len(urls)} URLs") return validatedUrls async def _performWebCrawl( self, instruction: str, urls: List[str], maxDepth: int = 2, maxWidth: int = 10, parentOperationId: Optional[str] = None ) -> List[Dict[str, Any]]: """Perform web crawl on list of URLs - crawls URLs in parallel for better performance.""" # Create tasks for parallel crawling crawlTasks = [] for urlIndex, url in enumerate(urls): task = self._crawlSingleUrl( url=url, urlIndex=urlIndex, totalUrls=len(urls), instruction=instruction, maxDepth=maxDepth, maxWidth=maxWidth, # Pass maxWidth to single URL crawl parentOperationId=parentOperationId ) crawlTasks.append(task) # Execute all crawl tasks in parallel logger.info(f"Starting parallel crawl of {len(urls)} URLs") crawlResults = await asyncio.gather(*crawlTasks, return_exceptions=True) # Process results and handle exceptions processedResults = [] for idx, result in enumerate(crawlResults): if isinstance(result, Exception): logger.error(f"Error crawling URL {urls[idx]}: {str(result)}") processedResults.append({"url": urls[idx], "error": str(result)}) else: processedResults.extend(result if isinstance(result, list) else [result]) logger.info(f"Completed parallel crawl: {len(processedResults)} results") return processedResults async def _crawlSingleUrl( self, url: str, urlIndex: int, totalUrls: int, instruction: str, maxDepth: int, maxWidth: int = 10, parentOperationId: Optional[str] = None ) -> List[Dict[str, Any]]: """ Crawl a single URL - called in parallel for multiple URLs. Args: url: URL to crawl urlIndex: Index of URL in the list totalUrls: Total number of URLs being crawled instruction: Research instruction maxDepth: Maximum crawl depth parentOperationId: Parent operation ID for progress tracking Returns: List of crawl results for this URL """ # Create separate operation for each URL with parent reference urlOperationId = None if parentOperationId: workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" urlOperationId = f"web_crawl_url_{workflowId}_{urlIndex}_{int(time.time())}" self.services.chat.progressLogStart( urlOperationId, "Web Crawl", f"URL {urlIndex + 1}/{totalUrls}", url[:50] + "..." if len(url) > 50 else url, parentOperationId=parentOperationId ) try: logger.info(f"Crawling URL {urlIndex + 1}/{totalUrls}: {url}") if urlOperationId: displayUrl = url[:50] + "..." if len(url) > 50 else url self.services.chat.progressLogUpdate(urlOperationId, 0.2, f"Crawling: {displayUrl}") self.services.chat.progressLogUpdate(urlOperationId, 0.3, "Initiating crawl") # Build crawl prompt model for single URL # maxWidth is passed from performWebResearch based on researchDepth logger.info(f"Crawling URL {urlIndex + 1}/{totalUrls} with maxDepth={maxDepth}, maxWidth={maxWidth}") crawlPromptModel = AiCallPromptWebCrawl( instruction=instruction, url=url, # Single URL maxDepth=maxDepth, maxWidth=maxWidth # Scaled based on researchDepth: fast=5, general=10, deep=20 ) crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2) # Debug: persist crawl prompt (with URL identifier in content for clarity) debugPrompt = f"URL: {url}\n\n{crawlPrompt}" self.services.utils.writeDebugFile(debugPrompt, "webcrawl_prompt") # Call AI with WEB_CRAWL operation crawlOptions = AiCallOptions( operationType=OperationTypeEnum.WEB_CRAWL, resultFormat="json" ) if urlOperationId: self.services.chat.progressLogUpdate(urlOperationId, 0.4, "Calling crawl connector") # Use unified callAiContent method with parentOperationId for hierarchical logging crawlResponse = await self.services.ai.callAiContent( prompt=crawlPrompt, options=crawlOptions, outputFormat="json", parentOperationId=urlOperationId # Pass URL operation ID as parent for sub-URL logging ) if urlOperationId: self.services.chat.progressLogUpdate(urlOperationId, 0.7, "Processing crawl results") # Extract content from AiResponse crawlResult = crawlResponse.content # Debug: persist crawl response if isinstance(crawlResult, str): self.services.utils.writeDebugFile(crawlResult, "webcrawl_response") else: self.services.utils.writeDebugFile(json.dumps(crawlResult, indent=2), "webcrawl_response") # Parse crawl result if isinstance(crawlResult, str): try: # Extract JSON from response (handles markdown code blocks) extractedJson = self.services.utils.jsonExtractString(crawlResult) crawlData = json.loads(extractedJson) if extractedJson else json.loads(crawlResult) except: crawlData = {"url": url, "content": crawlResult} else: crawlData = crawlResult # Process crawl results and create hierarchical progress logging for sub-URLs if urlOperationId: self.services.chat.progressLogUpdate(urlOperationId, 0.8, "Processing crawl results") # Recursively process crawl results to find nested URLs and create child operations processedResults = self._processCrawlResultsWithHierarchy(crawlData, url, urlOperationId, maxDepth, 0) # Count total URLs crawled (including sub-URLs) for progress message totalUrlsCrawled = self._countUrlsInResults(processedResults) # Ensure it's a list of results if isinstance(processedResults, list): results = processedResults elif isinstance(processedResults, dict): results = [processedResults] else: results = [{"url": url, "content": str(processedResults)}] if urlOperationId: if totalUrlsCrawled > 1: self.services.chat.progressLogUpdate(urlOperationId, 0.9, f"Crawled {totalUrlsCrawled} URLs (including sub-URLs)") else: self.services.chat.progressLogUpdate(urlOperationId, 0.9, "Crawl completed") self.services.chat.progressLogFinish(urlOperationId, True) return results except Exception as e: logger.error(f"Error crawling URL {url}: {str(e)}") if urlOperationId: self.services.chat.progressLogFinish(urlOperationId, False) return [{"url": url, "error": str(e)}] def _processCrawlResultsWithHierarchy( self, crawlData: Any, parentUrl: str, parentOperationId: Optional[str], maxDepth: int, currentDepth: int ) -> List[Dict[str, Any]]: """ Recursively process crawl results to create hierarchical progress logging for sub-URLs. Args: crawlData: Crawl result data (dict, list, or other) parentUrl: Parent URL being crawled parentOperationId: Parent operation ID for hierarchical logging maxDepth: Maximum crawl depth currentDepth: Current depth in the crawl tree Returns: List of processed crawl results """ import time results = [] # Handle list of results if isinstance(crawlData, list): for idx, item in enumerate(crawlData): if isinstance(item, dict): # Check if this item has sub-URLs or nested results itemUrl = item.get("url") or item.get("source") or parentUrl # Create child operation for sub-URL if we're not at max depth if currentDepth < maxDepth and parentOperationId: # Check if this item has nested results or children hasNestedResults = "results" in item or "children" in item or "subUrls" in item if hasNestedResults or (itemUrl != parentUrl and currentDepth > 0): # This is a sub-URL - create child operation workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" subUrlOperationId = f"{parentOperationId}_sub_{idx}_{int(time.time())}" self.services.chat.progressLogStart( subUrlOperationId, "Crawling Sub-URL", f"Depth {currentDepth + 1}", itemUrl[:50] + "..." if len(itemUrl) > 50 else itemUrl, parentOperationId=parentOperationId ) try: # Process nested results recursively if "results" in item: nestedResults = self._processCrawlResultsWithHierarchy( item["results"], itemUrl, subUrlOperationId, maxDepth, currentDepth + 1 ) item["results"] = nestedResults elif "children" in item: nestedResults = self._processCrawlResultsWithHierarchy( item["children"], itemUrl, subUrlOperationId, maxDepth, currentDepth + 1 ) item["children"] = nestedResults elif "subUrls" in item: nestedResults = self._processCrawlResultsWithHierarchy( item["subUrls"], itemUrl, subUrlOperationId, maxDepth, currentDepth + 1 ) item["subUrls"] = nestedResults self.services.chat.progressLogUpdate(subUrlOperationId, 0.9, "Completed") self.services.chat.progressLogFinish(subUrlOperationId, True) except Exception as e: logger.error(f"Error processing sub-URL {itemUrl}: {str(e)}") if subUrlOperationId: self.services.chat.progressLogFinish(subUrlOperationId, False) results.append(item) else: results.append(item) # Handle dict with results array elif isinstance(crawlData, dict): if "results" in crawlData: # Process nested results nestedResults = self._processCrawlResultsWithHierarchy( crawlData["results"], parentUrl, parentOperationId, maxDepth, currentDepth ) crawlData["results"] = nestedResults results.append(crawlData) elif "children" in crawlData: # Process children nestedResults = self._processCrawlResultsWithHierarchy( crawlData["children"], parentUrl, parentOperationId, maxDepth, currentDepth ) crawlData["children"] = nestedResults results.append(crawlData) elif "subUrls" in crawlData: # Process sub-URLs nestedResults = self._processCrawlResultsWithHierarchy( crawlData["subUrls"], parentUrl, parentOperationId, maxDepth, currentDepth ) crawlData["subUrls"] = nestedResults results.append(crawlData) else: # Single result dict results.append(crawlData) else: # Other types - wrap in dict results.append({"url": parentUrl, "content": str(crawlData)}) return results def _countUrlsInResults(self, results: Any) -> int: """ Recursively count total URLs in crawl results (including nested sub-URLs). Args: results: Crawl results (dict, list, or other) Returns: Total count of URLs found """ count = 0 if isinstance(results, list): for item in results: count += self._countUrlsInResults(item) elif isinstance(results, dict): # Count this URL if it has a url field if "url" in results or "source" in results: count += 1 # Recursively count nested results if "results" in results: count += self._countUrlsInResults(results["results"]) if "children" in results: count += self._countUrlsInResults(results["children"]) if "subUrls" in results: count += self._countUrlsInResults(results["subUrls"]) elif isinstance(results, str): # Single URL string count = 1 return count