"""Tavily web search class. """ import logging import asyncio import re from dataclasses import dataclass from typing import Optional, List, Dict from tavily import AsyncTavilyClient from modules.shared.configuration import APP_CONFIG from modules.aicore.aicoreBase import BaseConnectorAi from modules.datamodels.datamodelAi import AiModel, PriorityEnum, ProcessingModeEnum, OperationTypeEnum, AiModelResponse, createOperationTypeRatings logger = logging.getLogger(__name__) @dataclass class WebSearchResult: title: str url: str raw_content: Optional[str] = None @dataclass class WebCrawlResult: url: str content: str @dataclass class WebResearchRequest: """Ultra-simplified web research request""" user_prompt: str urls: Optional[List[str]] = None max_results: int = 5 max_pages: int = 10 search_depth: str = "basic" extract_depth: str = "advanced" format: str = "markdown" country: Optional[str] = None time_range: Optional[str] = None topic: Optional[str] = None language: Optional[str] = None @dataclass class WebResearchResult: """Ultra-simplified web research result - just success/error + documents""" success: bool = True error: Optional[str] = None documents: List[dict] = None # Simple dict instead of ActionDocument def __post_init__(self): if self.documents is None: self.documents = [] class ConnectorWeb(BaseConnectorAi): """Tavily web search connector.""" def __init__(self): super().__init__() self.client: Optional[AsyncTavilyClient] = None # Cached settings loaded at initialization time self.crawlTimeout: int = 30 self.crawlMaxRetries: int = 3 self.crawlRetryDelay: int = 2 # Cached web search constraints (camelCase per project style) self.webSearchMinResults: int = 1 self.webSearchMaxResults: int = 20 # Initialize client if API key is available self._initializeClient() def _initializeClient(self): """Initialize the Tavily client if API key is available.""" try: api_key = APP_CONFIG.get("Connector_AiTavily_API_SECRET") if api_key: self.client = AsyncTavilyClient(api_key=api_key) logger.info("Tavily client initialized successfully") else: logger.warning("Tavily API key not found, client not initialized") except Exception as e: logger.error(f"Failed to initialize Tavily client: {str(e)}") def getConnectorType(self) -> str: """Get the connector type identifier.""" return "tavily" def _extractUrlsFromPrompt(self, prompt: str) -> List[str]: """Extract URLs from a text prompt using regex.""" if not prompt: return [] # URL regex pattern - matches http/https URLs url_pattern = r'https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?' urls = re.findall(url_pattern, prompt) # Remove duplicates while preserving order seen = set() unique_urls = [] for url in urls: if url not in seen: seen.add(url) unique_urls.append(url) return unique_urls def _intelligentUrlFiltering(self, searchResults: List[WebSearchResult], query: str, maxResults: int) -> List[WebSearchResult]: """ Intelligent URL filtering with de-duplication and relevance scoring. Args: searchResults: Raw search results from Tavily query: Original search query for relevance scoring maxResults: Maximum number of results to return Returns: Filtered and deduplicated list of search results """ if not searchResults: return [] # Step 1: Basic de-duplication by URL seenUrls = set() uniqueResults = [] for result in searchResults: # Normalize URL for better deduplication normalizedUrl = self._normalizeUrl(result.url) if normalizedUrl not in seenUrls: seenUrls.add(normalizedUrl) uniqueResults.append(result) logger.info(f"After basic deduplication: {len(uniqueResults)} unique URLs from {len(searchResults)} original") # Step 2: Relevance scoring and filtering scoredResults = [] queryWords = set(query.lower().split()) for result in uniqueResults: score = self._calculateRelevanceScore(result, queryWords) scoredResults.append((score, result)) # Step 3: Sort by relevance score (higher is better) scoredResults.sort(key=lambda x: x[0], reverse=True) # Step 4: Take top results filteredResults = [result for score, result in scoredResults[:maxResults]] logger.info(f"After intelligent filtering: {len(filteredResults)} results selected from {len(uniqueResults)} unique") return filteredResults def _normalizeUrl(self, url: str) -> str: """ Normalize URL for better deduplication. Removes common variations that represent the same content. """ if not url: return url # Remove trailing slashes url = url.rstrip('/') # Remove common query parameters that don't affect content import urllib.parse parsed = urllib.parse.urlparse(url) # Remove common tracking parameters queryParams = urllib.parse.parse_qs(parsed.query) filteredParams = {} for key, values in queryParams.items(): # Keep important parameters, remove tracking ones if key.lower() not in ['utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content', 'fbclid', 'gclid', 'ref', 'source', 'campaign']: filteredParams[key] = values # Rebuild query string filteredQuery = urllib.parse.urlencode(filteredParams, doseq=True) # Reconstruct URL normalized = urllib.parse.urlunparse(( parsed.scheme, parsed.netloc, parsed.path, parsed.params, filteredQuery, parsed.fragment )) return normalized def _calculateRelevanceScore(self, result: WebSearchResult, queryWords: set) -> float: """ Calculate relevance score for a search result. Higher score means more relevant to the query. """ score = 0.0 # Title relevance (most important) titleWords = set(result.title.lower().split()) titleMatches = len(queryWords.intersection(titleWords)) score += titleMatches * 3.0 # Weight title matches heavily # URL relevance urlWords = set(result.url.lower().split('/')) urlMatches = len(queryWords.intersection(urlWords)) score += urlMatches * 1.5 # Content relevance (if available) if hasattr(result, 'raw_content') and result.raw_content: contentWords = set(result.raw_content.lower().split()) contentMatches = len(queryWords.intersection(contentWords)) score += contentMatches * 0.1 # Lower weight for content matches # Domain authority bonus (simple heuristic) domain = result.url.split('/')[2] if '/' in result.url else result.url if any(auth_domain in domain.lower() for auth_domain in ['wikipedia.org', 'github.com', 'stackoverflow.com', 'reddit.com', 'medium.com']): score += 1.0 # Penalty for very long URLs (often less relevant) if len(result.url) > 100: score -= 0.5 return score async def _optimizeSearchQuery(self, query: str, timeRange: str = None, country: str = None, language: str = None) -> tuple[str, dict]: """ Use AI to optimize search query and parameters (from old SubWebResearch). Args: query: Original search query timeRange: Time range filter country: Country filter language: Language filter Returns: Tuple of (optimized_query, optimized_parameters) """ try: # Create AI prompt for query optimization (from old code) queryOptimizerPrompt = f"""You are a search query optimizer. USER QUERY: {query} Your task: Create a search query and parameters for the USER QUERY given. RULES: 1. The search query MUST be related to the user query above 2. Extract key terms from the user query 3. Determine appropriate country/language based on the query context 4. Keep search query short (2-6 words) Return ONLY this JSON format: {{ "user_prompt": "search query based on user query above", "country": "Full English country name (ISO-3166; map codes via pycountry/i18n-iso-countries)", "language": "language_code_or_null", "topic": "general|news|academic_or_null", "time_range": "d|w|m|y_or_null", "selection_strategy": "single|multiple|specific_page", "selection_criteria": "what URLs to prioritize", "expected_url_patterns": ["pattern1", "pattern2"], "estimated_result_count": number }}""" # Use AI to optimize the query from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions aiRequest = AiCallRequest( prompt=queryOptimizerPrompt, options=AiCallOptions() ) # Get AI response (this would need to be called through the AI interface) # For now, return the original query with basic optimization logger.info(f"AI query optimization requested for: '{query}'") # Basic optimization fallback optimizedQuery = query optimizedParams = { "time_range": timeRange, "country": country, "language": language, "topic": "general" } return optimizedQuery, optimizedParams except Exception as e: logger.warning(f"Query optimization failed: {str(e)}, using original query") return query, {"time_range": timeRange, "country": country, "language": language} async def _aiBasedUrlSelection(self, searchResults: List[WebSearchResult], originalQuery: str, maxResults: int) -> List[WebSearchResult]: """ Use AI to select the most relevant URLs from search results (from old SubWebResearch). Args: searchResults: Raw search results from Tavily originalQuery: Original user query for context maxResults: Maximum number of results to return Returns: AI-selected and filtered list of search results """ try: if not searchResults: return [] # Step 1: Basic de-duplication seenUrls = set() uniqueResults = [] for result in searchResults: normalizedUrl = self._normalizeUrl(result.url) if normalizedUrl not in seenUrls: seenUrls.add(normalizedUrl) uniqueResults.append(result) logger.info(f"After basic deduplication: {len(uniqueResults)} unique URLs from {len(searchResults)} original") if len(uniqueResults) <= maxResults: return uniqueResults # Step 2: AI-based URL selection (from old code) logger.info(f"AI selecting most relevant {maxResults} URLs from {len(uniqueResults)} unique results") # Create AI prompt for URL selection (from old code) urlList = "\n".join([f"{i+1}. {result.url}" for i, result in enumerate(uniqueResults)]) aiPrompt = f"""Select the most relevant URLs from these search results: {urlList} Return only the URLs that are most relevant for the user's query: "{originalQuery}" One URL per line. """ # For now, use intelligent filtering as fallback # In a full implementation, this would call the AI interface logger.info("Using intelligent filtering as AI selection fallback") # Use the existing intelligent filtering filteredResults = self._intelligentUrlFiltering(uniqueResults, originalQuery, maxResults) logger.info(f"AI-based selection completed: {len(filteredResults)} results selected") return filteredResults except Exception as e: logger.warning(f"AI-based URL selection failed: {str(e)}, using intelligent filtering") return self._intelligentUrlFiltering(searchResults, originalQuery, maxResults) def getModels(self) -> List[AiModel]: """Get all available Tavily models.""" return [ AiModel( name="tavily-search", displayName="Tavily Search", connectorType="tavily", apiUrl="https://api.tavily.com/search", temperature=0.0, # Web search doesn't use temperature maxTokens=0, # Web search doesn't use tokens contextLength=0, costPer1kTokensInput=0.0, costPer1kTokensOutput=0.0, speedRating=9, # Very fast for URL discovery qualityRating=9, # Excellent URL discovery quality # capabilities removed (not used in business logic) functionCall=self.callWebOperation, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.BASIC, operationTypes=createOperationTypeRatings( (OperationTypeEnum.WEB_SEARCH, 10), (OperationTypeEnum.WEB_RESEARCH, 3), (OperationTypeEnum.WEB_CRAWL, 2), (OperationTypeEnum.WEB_NEWS, 3), (OperationTypeEnum.WEB_QUESTIONS, 2) ), version="tavily-search", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived, searchDepth="basic", numRequests=1: numRequests * (1 if searchDepth == "basic" else 2) * 0.008 ), AiModel( name="tavily-extract", displayName="Tavily Extract", connectorType="tavily", apiUrl="https://api.tavily.com/extract", temperature=0.0, # Web crawling doesn't use temperature maxTokens=0, # Web crawling doesn't use tokens contextLength=0, costPer1kTokensInput=0.0, costPer1kTokensOutput=0.0, speedRating=7, # Good for content extraction qualityRating=9, # Excellent content extraction quality # capabilities removed (not used in business logic) functionCall=self.callWebOperation, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.BASIC, operationTypes=createOperationTypeRatings( (OperationTypeEnum.WEB_CRAWL, 10), (OperationTypeEnum.WEB_RESEARCH, 3), (OperationTypeEnum.WEB_NEWS, 3), (OperationTypeEnum.WEB_QUESTIONS, 2) ), version="tavily-extract", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived, numPages=10, extractionDepth="basic", withInstructions=False, numSuccessfulExtractions=10: ((numPages / 10) * (2 if withInstructions else 1) + (numSuccessfulExtractions / 5) * (1 if extractionDepth == "basic" else 2)) * 0.008 ), AiModel( name="tavily-search-extract", displayName="Tavily Search & Extract", connectorType="tavily", apiUrl="https://api.tavily.com/search", temperature=0.0, # Web scraping doesn't use temperature maxTokens=0, # Web scraping doesn't use tokens contextLength=0, costPer1kTokensInput=0.0, costPer1kTokensOutput=0.0, speedRating=7, # Good for combined search+extract qualityRating=8, # Good quality for structured data # capabilities removed (not used in business logic) functionCall=self.callWebOperation, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.BASIC, operationTypes=createOperationTypeRatings( (OperationTypeEnum.WEB_RESEARCH, 8), (OperationTypeEnum.WEB_SEARCH, 6), (OperationTypeEnum.WEB_CRAWL, 6), (OperationTypeEnum.WEB_NEWS, 5), (OperationTypeEnum.WEB_QUESTIONS, 5) ), version="tavily-search-extract", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived, searchDepth="basic", numSuccessfulUrls=1, extractionDepth="basic": ((1 if searchDepth == "basic" else 2) + (numSuccessfulUrls / 5) * (1 if extractionDepth == "basic" else 2)) * 0.008 ) ] @classmethod async def create(cls): api_key = APP_CONFIG.get("Connector_AiTavily_API_SECRET") if not api_key: raise ValueError("Tavily API key not configured. Please set Connector_AiTavily_API_SECRET in config.ini") # Load and cache web crawl related configuration crawlTimeout = int(APP_CONFIG.get("Web_Crawl_TIMEOUT", "30")) crawlMaxRetries = int(APP_CONFIG.get("Web_Crawl_MAX_RETRIES", "3")) crawlRetryDelay = int(APP_CONFIG.get("Web_Crawl_RETRY_DELAY", "2")) return cls( client=AsyncTavilyClient(api_key=api_key), crawlTimeout=crawlTimeout, crawlMaxRetries=crawlMaxRetries, crawlRetryDelay=crawlRetryDelay, webSearchMinResults=int(APP_CONFIG.get("Web_Search_MIN_RESULTS", "1")), webSearchMaxResults=int(APP_CONFIG.get("Web_Search_MAX_RESULTS", "20")), ) # Standardized method using AiModelCall/AiModelResponse pattern async def callWebOperation(self, modelCall) -> "AiModelResponse": """ Universal web operation handler that distributes to the correct method based on the operationType from AiCallOptions. """ try: options = modelCall.options operationType = getattr(options, "operationType", None) if operationType == OperationTypeEnum.WEB_SEARCH: return await self.search(modelCall) elif operationType == OperationTypeEnum.WEB_CRAWL: return await self.crawl(modelCall) elif operationType in [OperationTypeEnum.WEB_RESEARCH, OperationTypeEnum.WEB_QUESTIONS, OperationTypeEnum.WEB_NEWS]: return await self.research(modelCall) else: # Fallback to search for unknown operation types return await self.search(modelCall) except Exception as e: return AiModelResponse( content="", success=False, error=str(e) ) async def search(self, modelCall) -> "AiModelResponse": """Search using standardized AiModelCall/AiModelResponse pattern""" try: # Extract parameters from modelCall prompt_content = modelCall.messages[0]["content"] if modelCall.messages else "" options = modelCall.options # Parse unified prompt JSON format import json promptData = json.loads(prompt_content) # Extract parameters from unified prompt JSON query = promptData.get("searchPrompt", prompt_content) maxResults = promptData.get("maxResults", 5) timeRange = promptData.get("timeRange") country = promptData.get("country") language = promptData.get("language") # Use basic search depth for web search operations searchDepth = "basic" # Step 1: AI Query Optimization (from old SubWebResearch) optimizedQuery, optimizedParams = await self._optimizeSearchQuery(query, timeRange, country, language) # Step 2: Get more results than requested to allow for intelligent filtering searchResults = await self._search( query=optimizedQuery, max_results=min(maxResults * 3, 30), # Get more results for better AI selection search_depth=searchDepth, time_range=optimizedParams.get("time_range", timeRange), country=optimizedParams.get("country", country), language=optimizedParams.get("language", language), include_answer=getattr(options, "include_answer", True), include_raw_content=getattr(options, "include_raw_content", True), ) # Step 3: AI-based URL selection and intelligent filtering filteredResults = await self._aiBasedUrlSelection(searchResults, query, maxResults) # Convert to JSON string resultsJson = { "query": query, "results": [ { "title": result.title, "url": result.url, "content": getattr(result, 'raw_content', None) } for result in filteredResults ], "total_count": len(filteredResults), "original_count": len(searchResults), "filtered_count": len(searchResults) - len(filteredResults) } import json content = json.dumps(resultsJson, indent=2) return AiModelResponse( content=content, success=True, metadata={ "total_count": len(filteredResults), "search_depth": searchDepth } ) except Exception as e: return AiModelResponse( content="", success=False, error=str(e) ) async def crawl(self, modelCall) -> "AiModelResponse": """Crawl using standardized AiModelCall/AiModelResponse pattern""" try: # Extract parameters from modelCall promptContent = modelCall.messages[0]["content"] if modelCall.messages else "" options = modelCall.options # Parse unified prompt JSON format import json promptData = json.loads(promptContent) # Extract parameters from unified prompt JSON urls = promptData.get("urls", []) extractDepth = promptData.get("extractDepth", "advanced") formatType = promptData.get("format", "markdown") if not urls: return AiModelResponse( content="No URLs provided for crawling", success=False, error="No URLs found in prompt data" ) rawResults = await self._crawl( urls, extract_depth=extractDepth, format=formatType, ) # Convert to JSON string resultsJson = { "urls": urls, "results": [ { "url": result.url, "title": getattr(result, 'title', ''), "content": result.content, "extractedAt": getattr(result, 'extracted_at', '') } for result in rawResults ], "total_count": len(rawResults) } import json content = json.dumps(resultsJson, indent=2) return AiModelResponse( content=content, success=True, metadata={ "total_count": len(rawResults), "urls_processed": len(urls) } ) except Exception as e: return AiModelResponse( content="", success=False, error=str(e) ) async def research(self, modelCall) -> "AiModelResponse": """ Handle WEB_RESEARCH, WEB_QUESTIONS, WEB_NEWS operations using search + crawl combination. Single method for all three operation types with different standard settings. """ try: # Extract parameters from modelCall promptContent = modelCall.messages[0]["content"] if modelCall.messages else "" options = modelCall.options operationType = getattr(options, "operationType", None) # Parse unified prompt JSON format import json promptData = json.loads(promptContent) # Extract parameters based on operation type if operationType == OperationTypeEnum.WEB_RESEARCH: query = promptData.get("researchPrompt", promptContent) maxResults = promptData.get("maxResults", 8) searchDepth = "basic" timeRange = promptData.get("timeRange") country = promptData.get("country") language = promptData.get("language") topic = "general" elif operationType == OperationTypeEnum.WEB_QUESTIONS: query = promptData.get("question", promptContent) maxResults = promptData.get("maxResults", 6) searchDepth = "basic" timeRange = promptData.get("timeRange") country = promptData.get("country") language = promptData.get("language") topic = "general" elif operationType == OperationTypeEnum.WEB_NEWS: query = promptData.get("newsPrompt", promptContent) maxResults = promptData.get("maxResults", 10) searchDepth = "basic" timeRange = promptData.get("timeRange", "w") # Default to week for news country = promptData.get("country") language = promptData.get("language") topic = "news" else: # Fallback to research settings query = promptData.get("researchPrompt", promptContent) maxResults = promptData.get("maxResults", 5) searchDepth = "basic" timeRange = promptData.get("timeRange") country = promptData.get("country") language = promptData.get("language") topic = "general" logger.info(f"Tavily {operationType} operation: query='{query}', maxResults={maxResults}, topic={topic}") # Step 1: Search for relevant URLs searchResults = await self._search( query=query, max_results=maxResults * 2, # Get more for better selection search_depth=searchDepth, time_range=timeRange, country=country, language=language, topic=topic, include_answer=True, include_raw_content=True ) if not searchResults: return AiModelResponse( content="No search results found", success=False, error="No relevant URLs found for the query" ) # Step 2: AI-based URL selection selectedResults = await self._aiBasedUrlSelection(searchResults, query, maxResults) if not selectedResults: return AiModelResponse( content="No relevant URLs selected", success=False, error="AI could not select relevant URLs" ) # Step 3: Crawl selected URLs for content urlsToCrawl = [result.url for result in selectedResults] crawlResults = await self._crawl( urls=urlsToCrawl, extract_depth="advanced", format="markdown" ) # Step 4: Combine search and crawl results combinedResults = [] for searchResult in selectedResults: # Find corresponding crawl result crawlResult = next((cr for cr in crawlResults if cr.url == searchResult.url), None) combinedResult = { "title": searchResult.title, "url": searchResult.url, "summary": getattr(searchResult, 'raw_content', ''), "content": crawlResult.content if crawlResult else '', "extractedAt": getattr(crawlResult, 'extracted_at', '') if crawlResult else '' } combinedResults.append(combinedResult) # Step 5: Format response based on operation type if operationType == "WEB_RESEARCH": responseData = { "query": query, "research_results": combinedResults, "total_count": len(combinedResults), "operation_type": "research" } elif operationType == "WEB_QUESTIONS": responseData = { "question": query, "answer_sources": combinedResults, "total_count": len(combinedResults), "operation_type": "questions" } elif operationType == "WEB_NEWS": responseData = { "news_query": query, "articles": combinedResults, "total_count": len(combinedResults), "operation_type": "news" } else: responseData = { "query": query, "results": combinedResults, "total_count": len(combinedResults), "operation_type": operationType } import json content = json.dumps(responseData, indent=2) return AiModelResponse( content=content, success=True, metadata={ "total_count": len(combinedResults), "urls_searched": len(searchResults), "urls_crawled": len(crawlResults), "operation_type": operationType } ) except Exception as e: return AiModelResponse( content="", success=False, error=str(e) ) async def scrape(self, modelCall) -> "AiModelResponse": """Scrape using standardized AiModelCall/AiModelResponse pattern""" try: # Extract parameters from modelCall query = modelCall.messages[0]["content"] if modelCall.messages else "" options = modelCall.options search_results = await self._search( query=query, max_results=getattr(options, "max_results", 5), search_depth=getattr(options, "search_depth", None), time_range=getattr(options, "time_range", None), topic=getattr(options, "topic", None), include_domains=getattr(options, "include_domains", None), exclude_domains=getattr(options, "exclude_domains", None), language=getattr(options, "language", None), include_answer=getattr(options, "include_answer", None), include_raw_content=getattr(options, "include_raw_content", None), ) urls = [result.url for result in search_results] crawl_results = await self._crawl( urls, extract_depth=getattr(options, "extract_depth", None), format=getattr(options, "format", None), ) # Convert to JSON string results_json = { "query": query, "results": [ { "url": result.url, "content": result.content } for result in crawl_results ], "total_count": len(crawl_results) } import json content = json.dumps(results_json, indent=2) return AiModelResponse( content=content, success=True, metadata={ "total_count": len(crawl_results), "search_depth": getattr(options, "search_depth", "basic"), "extract_depth": getattr(options, "extract_depth", "basic") } ) except Exception as e: return AiModelResponse( content="", success=False, error=str(e) ) # Helper Functions async def _search_urls_raw(self, *, query: str, max_results: int, search_depth: str | None = None, time_range: str | None = None, topic: str | None = None, include_domains: list[str] | None = None, exclude_domains: list[str] | None = None, language: str | None = None, include_answer: bool | None = None, include_raw_content: bool | None = None, ) -> list["WebSearchResult"]: return await self._search( query=query, max_results=max_results, search_depth=search_depth, time_range=time_range, topic=topic, include_domains=include_domains, exclude_domains=exclude_domains, language=language, include_answer=include_answer, include_raw_content=include_raw_content, ) async def _crawl_urls_raw(self, *, urls: list[str], extract_depth: str | None = None, format: str | None = None, ) -> list["WebCrawlResult"]: return await self._crawl(urls, extract_depth=extract_depth, format=format) async def _scrape_raw(self, *, query: str, max_results: int, search_depth: str | None = None, time_range: str | None = None, topic: str | None = None, include_domains: list[str] | None = None, exclude_domains: list[str] | None = None, language: str | None = None, include_answer: bool | None = None, include_raw_content: bool | None = None, extract_depth: str | None = None, format: str | None = None, ) -> list["WebCrawlResult"]: search_results = await self._search( query=query, max_results=max_results, search_depth=search_depth, time_range=time_range, topic=topic, include_domains=include_domains, exclude_domains=exclude_domains, language=language, include_answer=include_answer, include_raw_content=include_raw_content, ) urls = [result.url for result in search_results] return await self._crawl(urls, extract_depth=extract_depth, format=format) def _clean_url(self, url: str) -> str: """Clean URL by removing extra text that might be appended.""" import re # Extract just the URL part, removing any extra text after it url_match = re.match(r'(https?://[^\s,]+)', url) if url_match: return url_match.group(1) return url async def _search( self, query: str, max_results: int, search_depth: str | None = None, time_range: str | None = None, topic: str | None = None, include_domains: list[str] | None = None, exclude_domains: list[str] | None = None, language: str | None = None, country: str | None = None, include_answer: bool | None = None, include_raw_content: bool | None = None, ) -> list[WebSearchResult]: """Calls the Tavily API to perform a web search.""" # Make sure max_results is within the allowed range (use cached values) minResults = self.webSearchMinResults maxAllowedResults = self.webSearchMaxResults if max_results < minResults or max_results > maxAllowedResults: raise ValueError(f"max_results must be between {minResults} and {maxAllowedResults}") # Perform actual API call # Build kwargs only for provided options to avoid API rejections kwargs: dict = {"query": query, "max_results": max_results} if search_depth is not None: kwargs["search_depth"] = search_depth if time_range is not None: kwargs["time_range"] = time_range if topic is not None: kwargs["topic"] = topic if include_domains is not None and len(include_domains) > 0: kwargs["include_domains"] = include_domains if exclude_domains is not None: kwargs["exclude_domains"] = exclude_domains if language is not None: kwargs["language"] = language if country is not None: kwargs["country"] = country if include_answer is not None: kwargs["include_answer"] = include_answer if include_raw_content is not None: kwargs["include_raw_content"] = include_raw_content logger.debug(f"Tavily.search kwargs: {kwargs}") # Ensure client is initialized if self.client is None: self._initializeClient() if self.client is None: raise ValueError("Tavily client not initialized. Please check API key configuration.") response = await self.client.search(**kwargs) return [ WebSearchResult( title=result["title"], url=self._clean_url(result["url"]), raw_content=result.get("raw_content") ) for result in response["results"] ] async def _crawl( self, urls: list, extract_depth: str | None = None, format: str | None = None, ) -> list[WebCrawlResult]: """Calls the Tavily API to extract text content from URLs with retry logic.""" maxRetries = self.crawlMaxRetries retryDelay = self.crawlRetryDelay timeout = self.crawlTimeout logger.debug(f"Starting crawl of {len(urls)} URLs: {urls}") logger.debug(f"Crawl settings: extract_depth={extract_depth}, format={format}, timeout={timeout}s") for attempt in range(maxRetries + 1): try: logger.debug(f"Crawl attempt {attempt + 1}/{maxRetries + 1}") # Use asyncio.wait_for for timeout # Build kwargs for extract kwargs_extract: dict = {"urls": urls} kwargs_extract["extract_depth"] = extract_depth or "advanced" kwargs_extract["format"] = format or "markdown" # Use markdown to get HTML structure logger.debug(f"Sending request to Tavily with kwargs: {kwargs_extract}") # Ensure client is initialized if self.client is None: self._initializeClient() if self.client is None: raise ValueError("Tavily client not initialized. Please check API key configuration.") response = await asyncio.wait_for( self.client.extract(**kwargs_extract), timeout=timeout ) logger.debug(f"Tavily response received: {list(response.keys())}") # Debug: Log what Tavily actually returns if "results" in response and response["results"]: logger.debug(f"Tavily returned {len(response['results'])} results") logger.debug(f"First result keys: {list(response['results'][0].keys())}") logger.debug(f"First result has raw_content: {'raw_content' in response['results'][0]}") # Log each result for i, result in enumerate(response["results"]): logger.debug(f"Result {i+1}: URL={result.get('url', 'N/A')}, content_length={len(result.get('raw_content', result.get('content', '')))}") else: logger.warning(f"Tavily returned no results in response: {response}") results = [ WebCrawlResult( url=result["url"], content=result.get("raw_content", result.get("content", "")) # Try raw_content first, fallback to content ) for result in response["results"] ] logger.debug(f"Crawl successful: extracted {len(results)} results") return results except asyncio.TimeoutError: logger.warning(f"Crawl attempt {attempt + 1} timed out after {timeout} seconds for URLs: {urls}") if attempt < maxRetries: logger.info(f"Retrying in {retryDelay} seconds...") await asyncio.sleep(retryDelay) else: raise Exception(f"Crawl failed after {maxRetries + 1} attempts due to timeout") except Exception as e: logger.warning(f"Crawl attempt {attempt + 1} failed for URLs {urls}: {str(e)}") logger.debug(f"Full error details: {type(e).__name__}: {str(e)}") # Check if it's a validation error and log more details if "validation" in str(e).lower(): logger.debug(f"URL validation failed. Checking URL format:") for i, url in enumerate(urls): logger.debug(f" URL {i+1}: '{url}' (length: {len(url)})") # Check for common URL issues if ' ' in url: logger.debug(f" WARNING: URL contains spaces!") if not url.startswith(('http://', 'https://')): logger.debug(f" WARNING: URL doesn't start with http/https!") if len(url) > 2000: logger.debug(f" WARNING: URL is very long ({len(url)} chars)") if attempt < maxRetries: logger.info(f"Retrying in {retryDelay} seconds...") await asyncio.sleep(retryDelay) else: raise Exception(f"Crawl failed after {maxRetries + 1} attempts: {str(e)}") async def comprehensiveWebResearch(self, request: WebResearchRequest) -> WebResearchResult: """ Perform comprehensive web research using Tavily's search and extract capabilities. This method orchestrates the full web research workflow. """ try: logger.info(f"COMPREHENSIVE WEB RESEARCH STARTED") logger.info(f"User Query: {request.user_prompt}") logger.info(f"Max Results: {request.max_results}, Max Pages: {request.max_pages}") # Global URL index to track all processed URLs across the entire research session global_processed_urls = set() # Step 1: Find relevant websites - either provided URLs or AI-determined main URLs logger.info(f"=== STEP 1: INITIAL MAIN URLS LIST ===") if request.urls: # Use provided URLs as initial main URLs websites = request.urls logger.info(f"Using provided URLs ({len(websites)}):") for i, url in enumerate(websites, 1): logger.info(f" {i}. {url}") else: # Use AI to determine main URLs based on user's intention logger.info(f"AI analyzing user intent: '{request.user_prompt}'") # Use basic search parameters search_query = request.user_prompt search_depth = request.search_depth or "basic" time_range = request.time_range topic = request.topic country = request.country language = request.language max_results = request.max_results logger.info(f"Using search parameters: query='{search_query}', depth={search_depth}, time_range={time_range}, topic={topic}") # Perform web search search_results = await self._search( query=search_query, max_results=max_results, search_depth=search_depth, time_range=time_range, topic=topic, country=country, language=language, include_answer=True, include_raw_content=True ) # Extract URLs from search results websites = [result.url for result in search_results] logger.info(f"Found {len(websites)} URLs from search") # AI-based URL selection and deduplication if len(websites) > request.max_pages: logger.info(f"AI selecting most relevant {request.max_pages} URLs from {len(websites)} found") # For now, just take the first max_pages URLs selected_indices = list(range(min(request.max_pages, len(websites)))) selected_websites = [websites[i] for i in selected_indices] # Remove duplicates while preserving order seen = set() unique_websites = [] for url in selected_websites: if url not in seen: seen.add(url) unique_websites.append(url) websites = unique_websites logger.info(f"After AI selection deduplication: {len(websites)} unique URLs") logger.info(f"AI selected {len(websites)} main URLs (after deduplication):") for i, url in enumerate(websites, 1): logger.info(f" {i}. {url}") # Step 2: Smart website selection using AI interface logger.info(f"=== STEP 2: FILTERED URL LIST BY USER PROMPT'S INTENTION ===") logger.info(f"AI analyzing {len(websites)} URLs for relevance to: '{request.user_prompt}'") # For now, just use all websites selected_websites = websites logger.debug(f"AI selected {len(selected_websites)} most relevant URLs:") for i, url in enumerate(selected_websites, 1): logger.debug(f" {i}. {url}") # Step 3+4+5: Recursive crawling with configurable depth # Get configuration parameters max_depth = int(APP_CONFIG.get("Web_Research_MAX_DEPTH", "2")) max_links_per_domain = int(APP_CONFIG.get("Web_Research_MAX_LINKS_PER_DOMAIN", "4")) crawl_timeout_minutes = int(APP_CONFIG.get("Web_Research_CRAWL_TIMEOUT_MINUTES", "10")) # Use the configured max_depth or the request's search_depth, whichever is smaller effective_depth = min(max_depth, request.search_depth if isinstance(request.search_depth, int) else 2) logger.info(f"=== STEP 3+4+5: RECURSIVE CRAWLING ===") logger.info(f"Starting recursive crawl with depth {effective_depth}") logger.info(f"Max links per domain: {max_links_per_domain}") logger.info(f"Crawl timeout: {crawl_timeout_minutes} minutes") # Perform recursive crawling all_content = await self._crawlRecursively( urls=selected_websites, max_depth=effective_depth, extract_depth=request.extract_depth, max_per_domain=max_links_per_domain, global_processed_urls=global_processed_urls ) logger.info(f"Recursive crawl completed: {len(all_content)} total pages crawled") # Step 6: AI analysis of all collected content logger.info(f"=== STEP 6: AI ANALYSIS ===") logger.info(f"Analyzing {len(all_content)} websites with AI") # Create a basic analysis result analysis_result = f"Web research completed for: {request.user_prompt}\n\n" analysis_result += f"Analyzed {len(all_content)} websites:\n" for url, content in all_content.items(): analysis_result += f"- {url}: {len(content)} characters\n" # Create result documents import time result_documents = [] # Main research result main_document = { "documentName": f"web_research_{int(time.time())}.json", "documentData": { "user_prompt": request.user_prompt, "websites_analyzed": len(all_content), "additional_links_found": 0, # Would be calculated from crawl results "analysis_result": analysis_result, "sources": [{"title": f"Website {i+1}", "url": url} for i, url in enumerate(all_content.keys())], "additional_links": [], "debug_info": { "total_urls_processed": len(global_processed_urls), "crawl_depth": effective_depth, "extract_depth": request.extract_depth } }, "mimeType": "application/json" } result_documents.append(main_document) # Individual website content documents for i, (url, content) in enumerate(all_content.items()): content_document = { "documentName": f"website_content_{i+1}.md", "documentData": content, "mimeType": "text/markdown" } result_documents.append(content_document) logger.info(f"WEB RESEARCH COMPLETED SUCCESSFULLY") logger.info(f"Generated {len(result_documents)} result documents") return WebResearchResult( success=True, documents=result_documents ) except Exception as e: logger.error(f"Error in comprehensive web research: {str(e)}") return WebResearchResult( success=False, error=str(e), documents=[] ) async def _crawlRecursively(self, urls: List[str], max_depth: int, extract_depth: str = "advanced", max_per_domain: int = 10, global_processed_urls: Optional[set] = None) -> Dict[str, str]: """ Recursively crawl URLs up to specified depth. This is a simplified version of the recursive crawling logic. """ logger.info(f"Starting recursive crawl: {len(urls)} starting URLs, max_depth={max_depth}") # URL index to track all processed URLs (local + global) processed_urls = set() if global_processed_urls is not None: processed_urls = global_processed_urls logger.info(f"Using global URL index with {len(processed_urls)} already processed URLs") else: logger.info("Using local URL index for this crawl session") all_content = {} current_level_urls = urls.copy() try: for depth in range(1, max_depth + 1): logger.info(f"=== DEPTH LEVEL {depth}/{max_depth} ===") logger.info(f"Processing {len(current_level_urls)} URLs at depth {depth}") # URLs found at this level (for next iteration) next_level_urls = [] for url in current_level_urls: # Normalize URL for duplicate checking normalized_url = self._normalizeUrl(url) if normalized_url in processed_urls: logger.debug(f"URL {url} (normalized: {normalized_url}) already processed, skipping") continue try: logger.info(f"Processing URL at depth {depth}: {url}") # Extract content from URL crawl_results = await self._crawl([url], extract_depth=extract_depth, format="markdown") if crawl_results and crawl_results[0].content: content = crawl_results[0].content all_content[url] = content processed_urls.add(normalized_url) logger.info(f"✓ Successfully processed {url}: {len(content)} chars") # For simplicity, we'll skip finding sub-links in this implementation # In a full implementation, you would extract links and add them to next_level_urls else: logger.warning(f"✗ No content extracted from {url}") processed_urls.add(normalized_url) except Exception as e: logger.warning(f"✗ Failed to process URL {url} at depth {depth}: {e}") processed_urls.add(normalized_url) # Prepare for next iteration current_level_urls = next_level_urls logger.info(f"Depth {depth} completed. Found {len(next_level_urls)} URLs for next level") # Stop if no more URLs to process if not current_level_urls: logger.info(f"No more URLs found at depth {depth}, stopping recursion") break logger.info(f"Recursive crawl completed: {len(all_content)} total pages crawled") return all_content except Exception as e: logger.error(f"Crawling failed with error: {e}, returning partial results: {len(all_content)} pages crawled so far") return all_content def _normalizeUrl(self, url: str) -> str: """Normalize URL to handle variations that should be considered duplicates.""" if not url: return url # Remove trailing slashes and fragments url = url.rstrip('/') if '#' in url: url = url.split('#')[0] # Handle common URL variations url = url.replace('http://', 'https://') # Normalize protocol return url