# Copyright (c) 2025 Patrick Motsch # All rights reserved. """Tavily web search class. """ import logging import asyncio import re from dataclasses import dataclass from typing import Optional, List, Dict from tavily import AsyncTavilyClient from modules.shared.configuration import APP_CONFIG from .aicoreBase import BaseConnectorAi from modules.datamodels.datamodelAi import AiModel, PriorityEnum, ProcessingModeEnum, OperationTypeEnum, AiModelCall, AiModelResponse, createOperationTypeRatings, AiCallPromptWebSearch, AiCallPromptWebCrawl from modules.datamodels.datamodelTools import CountryCodes logger = logging.getLogger(__name__) @dataclass class WebSearchResult: title: str url: str rawContent: Optional[str] = None @dataclass class WebCrawlResult: url: str content: str title: Optional[str] = None class AiTavily(BaseConnectorAi): """Tavily web search connector.""" def __init__(self): super().__init__() self.client: Optional[AsyncTavilyClient] = None # Cached settings loaded at initialization time self.crawlTimeout: int = 30 self.crawlMaxRetries: int = 3 self.crawlRetryDelay: int = 2 # Cached web search constraints (camelCase per project style) self.webSearchMinResults: int = 1 self.webSearchMaxResults: int = 20 # Initialize client if API key is available self._initializeClient() def getModels(self) -> List[AiModel]: """Get all available Tavily models.""" return [ AiModel( name="tavily-search", displayName="Tavily Search & Research", connectorType="tavily", apiUrl="https://api.tavily.com", temperature=0.0, # Web search doesn't use temperature maxTokens=0, # Web search doesn't use tokens contextLength=0, costPer1kTokensInput=0.0, costPer1kTokensOutput=0.0, speedRating=8, # Good speed for search and extract qualityRating=9, # Excellent quality for web research # capabilities removed (not used in business logic) functionCall=self._routeWebOperation, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.BASIC, operationTypes=createOperationTypeRatings( (OperationTypeEnum.WEB_SEARCH_DATA, 9), (OperationTypeEnum.WEB_CRAWL, 10) ), version="tavily-search", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: 0.008 # Simple flat rate ) ] def _initializeClient(self): """Initialize the Tavily client if API key is available.""" try: apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET") if apiKey: self.client = AsyncTavilyClient(api_key=apiKey) logger.info("Tavily client initialized successfully") else: logger.warning("Tavily API key not found, client not initialized") except Exception as e: logger.error(f"Failed to initialize Tavily client: {str(e)}") def getConnectorType(self) -> str: """Get the connector type identifier.""" return "tavily" def _convertIsoCodeToCountryName(self, isoCode: str) -> str: """ Convert ISO-2 country code to Tavily country name. Uses centralized CountryCodes mapping. """ return CountryCodes.getForTavily(isoCode) def _extractUrlsFromPrompt(self, prompt: str) -> List[str]: """Extract URLs from a text prompt using regex.""" if not prompt: return [] # URL regex pattern - matches http/https URLs urlPattern = r'https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?' urls = re.findall(urlPattern, prompt) # Remove duplicates while preserving order seen = set() uniqueUrls = [] for url in urls: if url not in seen: seen.add(url) uniqueUrls.append(url) return uniqueUrls def _normalizeUrl(self, url: str) -> str: """ Normalize URL for better deduplication. Removes common variations that represent the same content. """ if not url: return url # Remove trailing slashes url = url.rstrip('/') # Remove common query parameters that don't affect content import urllib.parse parsed = urllib.parse.urlparse(url) # Remove common tracking parameters queryParams = urllib.parse.parse_qs(parsed.query) filteredParams = {} for key, values in queryParams.items(): # Keep important parameters, remove tracking ones if key.lower() not in ['utm_source', 'utm_medium', 'utm_campaign', 'utm_term', 'utm_content', 'fbclid', 'gclid', 'ref', 'source', 'campaign']: filteredParams[key] = values # Rebuild query string filteredQuery = urllib.parse.urlencode(filteredParams, doseq=True) # Reconstruct URL normalized = urllib.parse.urlunparse(( parsed.scheme, parsed.netloc, parsed.path, parsed.params, filteredQuery, parsed.fragment )) return normalized def _calculateRelevanceScore(self, result: WebSearchResult, queryWords: set) -> float: """ Calculate relevance score for a search result. Higher score means more relevant to the query. """ score = 0.0 # Title relevance (most important) titleWords = set(result.title.lower().split()) titleMatches = len(queryWords.intersection(titleWords)) score += titleMatches * 3.0 # Weight title matches heavily # URL relevance urlWords = set(result.url.lower().split('/')) urlMatches = len(queryWords.intersection(urlWords)) score += urlMatches * 1.5 # Content relevance (if available) if hasattr(result, 'rawContent') and result.rawContent: contentWords = set(result.rawContent.lower().split()) contentMatches = len(queryWords.intersection(contentWords)) score += contentMatches * 0.1 # Lower weight for content matches # Domain authority bonus (simple heuristic) domain = result.url.split('/')[2] if '/' in result.url else result.url if any(authDomain in domain.lower() for authDomain in ['wikipedia.org', 'github.com', 'stackoverflow.com', 'reddit.com', 'medium.com']): score += 1.0 # Penalty for very long URLs (often less relevant) if len(result.url) > 100: score -= 0.5 return score def _intelligentUrlFiltering(self, searchResults: List[WebSearchResult], query: str, maxResults: int) -> List[WebSearchResult]: """ Intelligent URL filtering with de-duplication and relevance scoring. Args: searchResults: Raw search results from Tavily query: Original search query for relevance scoring maxResults: Maximum number of results to return Returns: Filtered and deduplicated list of search results """ if not searchResults: return [] # Step 1: Basic de-duplication by URL seenUrls = set() uniqueResults = [] for result in searchResults: # Normalize URL for better deduplication normalizedUrl = self._normalizeUrl(result.url) if normalizedUrl not in seenUrls: seenUrls.add(normalizedUrl) uniqueResults.append(result) logger.info(f"After basic deduplication: {len(uniqueResults)} unique URLs from {len(searchResults)} original") # Step 2: Relevance scoring and filtering scoredResults = [] queryWords = set(query.lower().split()) for result in uniqueResults: score = self._calculateRelevanceScore(result, queryWords) scoredResults.append((score, result)) # Step 3: Sort by relevance score (higher is better) scoredResults.sort(key=lambda x: x[0], reverse=True) # Step 4: Take top results filteredResults = [result for score, result in scoredResults[:maxResults]] logger.info(f"After intelligent filtering: {len(filteredResults)} results selected from {len(uniqueResults)} unique") return filteredResults @classmethod async def create(cls): apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET") if not apiKey: raise ValueError("Tavily API key not configured. Please set Connector_AiTavily_API_SECRET in config.ini") # Load and cache web crawl related configuration crawlTimeout = int(APP_CONFIG.get("Web_Crawl_TIMEOUT", "30")) crawlMaxRetries = int(APP_CONFIG.get("Web_Crawl_MAX_RETRIES", "3")) crawlRetryDelay = int(APP_CONFIG.get("Web_Crawl_RETRY_DELAY", "2")) return cls( client=AsyncTavilyClient(api_key=apiKey), crawlTimeout=crawlTimeout, crawlMaxRetries=crawlMaxRetries, crawlRetryDelay=crawlRetryDelay, webSearchMinResults=int(APP_CONFIG.get("Web_Search_MIN_RESULTS", "1")), webSearchMaxResults=int(APP_CONFIG.get("Web_Search_MAX_RESULTS", "20")), ) # Standardized method using AiModelCall/AiModelResponse pattern def _cleanUrl(self, url: str) -> str: """Clean URL by removing extra text that might be appended.""" import re # Extract just the URL part, removing any extra text after it urlMatch = re.match(r'(https?://[^\s,]+)', url) if urlMatch: return urlMatch.group(1) return url async def _search( self, query: str, maxResults: int, searchDepth: str | None = None, timeRange: str | None = None, topic: str | None = None, includeDomains: list[str] | None = None, excludeDomains: list[str] | None = None, country: str | None = None, includeAnswer: str | None = None, includeRawContent: str | None = None, ) -> list[WebSearchResult]: """Calls the Tavily API to perform a web search.""" # Make sure maxResults is within the allowed range (use cached values) minResults = self.webSearchMinResults maxAllowedResults = self.webSearchMaxResults if maxResults < minResults or maxResults > maxAllowedResults: raise ValueError(f"maxResults must be between {minResults} and {maxAllowedResults}") # Perform actual API call # Build kwargs only for provided options to avoid API rejections kwargs: dict = {"query": query, "max_results": maxResults} if searchDepth is not None: kwargs["search_depth"] = searchDepth if timeRange is not None: kwargs["time_range"] = timeRange if topic is not None: kwargs["topic"] = topic if includeDomains is not None and len(includeDomains) > 0: kwargs["include_domains"] = includeDomains if excludeDomains is not None: kwargs["exclude_domains"] = excludeDomains if country is not None: kwargs["country"] = country if includeAnswer is not None: kwargs["include_answer"] = includeAnswer if includeRawContent is not None: kwargs["include_raw_content"] = includeRawContent # Log the final API call parameters for comparison logger.info(f"Tavily API call parameters: {kwargs}") # Ensure client is initialized if self.client is None: self._initializeClient() if self.client is None: raise ValueError("Tavily client not initialized. Please check API key configuration.") response = await self.client.search(**kwargs) # Return all results without score filtering # Tavily's scoring is already applied by the API results_count = len(response.get('results', [])) logger.info(f"Tavily returned {results_count} results") # Log content availability results_with_content = 0 for result in response.get('results', []): if result.get("raw_content"): results_with_content += 1 logger.info(f"Tavily results with raw_content: {results_with_content}/{results_count}") # Log first result structure for debugging if response.get('results') and len(response['results']) > 0: first_result = response['results'][0] logger.debug(f"First result keys: {list(first_result.keys())}") raw_content = first_result.get('raw_content') or '' logger.debug(f"First result has raw_content: {'raw_content' in first_result}, content length: {len(raw_content)}") return [ WebSearchResult( title=result.get("title", ""), url=self._cleanUrl(result.get("url", "")), rawContent=result.get("raw_content") or result.get("content") or "" ) for result in response["results"] ] async def _crawl( self, url: str, instructions: str | None = None, limit: int = 20, maxDepth: int = 2, maxBreadth: int = 40, ) -> list[WebCrawlResult]: """Calls the Tavily API to crawl ONE URL with link following and retry logic.""" maxRetries = self.crawlMaxRetries retryDelay = self.crawlRetryDelay timeout = self.crawlTimeout logger.info(f"Starting crawl of URL: {url}") logger.info(f"Crawl settings: instructions={instructions[:100] if instructions else None}..., limit={limit}, maxDepth={maxDepth}, maxBreadth={maxBreadth}, timeout={timeout}s") for attempt in range(maxRetries + 1): try: logger.debug(f"Crawl attempt {attempt + 1}/{maxRetries + 1}") # Ensure client is initialized if self.client is None: self._initializeClient() if self.client is None: raise ValueError("Tavily client not initialized. Please check API key configuration.") logger.debug(f"Crawling URL: {url}") # Build kwargs for crawl kwargsCrawl: dict = {"url": url} if instructions: kwargsCrawl["instructions"] = instructions if limit: kwargsCrawl["limit"] = limit if maxDepth: kwargsCrawl["max_depth"] = maxDepth if maxBreadth: kwargsCrawl["max_breadth"] = maxBreadth logger.info(f"Sending request to Tavily API with parameters: {kwargsCrawl}") response = await asyncio.wait_for( self.client.crawl(**kwargsCrawl), timeout=timeout ) logger.debug(f"Tavily response received: {type(response)}") # Parse response - could be dict with results or list if isinstance(response, dict): if "results" in response: pageResults = response["results"] logger.debug(f"Found 'results' key in response dict with {len(pageResults)} items") else: logger.warning(f"Response dict keys: {list(response.keys())}") # Check for other possible keys if "pages" in response: pageResults = response["pages"] logger.debug(f"Found 'pages' key with {len(pageResults)} items") elif "content" in response: # Single page result pageResults = [response] logger.debug("Found 'content' key, treating as single page result") else: logger.warning(f"Unexpected response dict structure: {list(response.keys())}") pageResults = [] elif isinstance(response, list): pageResults = response logger.debug(f"Response is a list with {len(pageResults)} items") else: logger.warning(f"Unexpected response format: {type(response)}, value: {str(response)[:200]}") pageResults = [] logger.info(f"Got {len(pageResults)} pages from crawl for URL: {url}") if len(pageResults) == 0: logger.warning(f"Tavily crawl returned 0 pages for URL: {url}. Response structure: {type(response)}") if isinstance(response, dict): logger.warning(f"Response keys: {list(response.keys())}") # Log all values to debug (not just first 3) for key, value in response.items(): value_str = str(value) if len(value_str) > 200: value_str = value_str[:200] + "..." logger.warning(f" {key}: {type(value)} - {value_str}") # Check for error messages in response if "error" in response: logger.error(f"Tavily API error in response: {response.get('error')}") if "message" in response: logger.warning(f"Tavily API message: {response.get('message')}") elif isinstance(response, str): logger.warning(f"Tavily returned string response (first 500 chars): {response[:500]}") else: logger.warning(f"Unexpected response type: {type(response)}, value: {str(response)[:500]}") # Convert to WebCrawlResult format with error handling results = [] for idx, result in enumerate(pageResults): try: # Safely extract fields result_url = result.get("url") if isinstance(result, dict) else (getattr(result, "url", None) if hasattr(result, "url") else url) result_content = "" if isinstance(result, dict): result_content = result.get("raw_content") or result.get("content") or "" elif hasattr(result, "raw_content"): result_content = result.raw_content or "" elif hasattr(result, "content"): result_content = result.content or "" result_title = "" if isinstance(result, dict): result_title = result.get("title", "") elif hasattr(result, "title"): result_title = result.title or "" results.append(WebCrawlResult( url=result_url or url, content=result_content, title=result_title )) except Exception as resultError: logger.warning(f"Error processing crawl result {idx}: {resultError}") # Try to create a minimal result with at least the URL try: if isinstance(result, dict) and result.get("url"): results.append(WebCrawlResult( url=result.get("url", url), content="", title="" )) except Exception: logger.error(f"Failed to create minimal result for crawl result {idx}") continue logger.debug(f"Crawl successful: extracted {len(results)} pages from URL") return results except asyncio.TimeoutError: logger.warning(f"Crawl attempt {attempt + 1} timed out after {timeout} seconds for URL: {url}") if attempt < maxRetries: logger.info(f"Retrying in {retryDelay} seconds...") await asyncio.sleep(retryDelay) else: raise Exception(f"Crawl failed after {maxRetries + 1} attempts due to timeout") except Exception as e: logger.warning(f"Crawl attempt {attempt + 1} failed for URL {url}: {str(e)}") logger.debug(f"Full error details: {type(e).__name__}: {str(e)}", exc_info=True) # Check if it's a validation error and log more details if "validation" in str(e).lower(): logger.debug(f"URL validation failed. Checking URL format:") logger.debug(f" URL: '{url}' (length: {len(url)})") # Check for common URL issues if ' ' in url: logger.debug(f" WARNING: URL contains spaces!") if not url.startswith(('http://', 'https://')): logger.debug(f" WARNING: URL doesn't start with http/https!") if len(url) > 2000: logger.debug(f" WARNING: URL is very long ({len(url)} chars)") # Log API-specific errors error_str = str(e).lower() if "rate limit" in error_str or "429" in error_str: logger.error(f"Tavily API rate limit hit for URL: {url}") elif "401" in error_str or "unauthorized" in error_str: logger.error(f"Tavily API authentication failed for URL: {url}") elif "404" in error_str or "not found" in error_str: logger.warning(f"URL not found (404) for: {url}") elif "timeout" in error_str: logger.warning(f"Timeout error for URL: {url}") if attempt < maxRetries: logger.info(f"Retrying in {retryDelay} seconds...") await asyncio.sleep(retryDelay) else: logger.error(f"Crawl failed after {maxRetries + 1} attempts for URL: {url}") raise Exception(f"Crawl failed after {maxRetries + 1} attempts: {str(e)}") async def _routeWebOperation(self, modelCall: AiModelCall) -> "AiModelResponse": """ Route web operation based on operation type. Args: modelCall: AiModelCall with messages and options Returns: AiModelResponse based on operation type """ operationType = modelCall.options.operationType if operationType == OperationTypeEnum.WEB_SEARCH_DATA: return await self.webSearch(modelCall) elif operationType == OperationTypeEnum.WEB_CRAWL: return await self.webCrawl(modelCall) else: # Unsupported operation type return AiModelResponse( content="", success=False, error=f"Unsupported operation type: {operationType}" ) async def webSearch(self, modelCall: AiModelCall) -> "AiModelResponse": """ WEB_SEARCH_DATA operation - returns list of URLs using Tavily search. Args: modelCall: AiModelCall with AiCallPromptWebSearch as prompt Returns: AiModelResponse with JSON list of URLs """ try: # Extract parameters - find user message (not system message) promptContent = "" if modelCall.messages: for msg in modelCall.messages: if msg.get("role") == "user": promptContent = msg.get("content", "") break # Fallback to first message if no user message found if not promptContent and len(modelCall.messages) > 0: promptContent = modelCall.messages[0].get("content", "") if not promptContent or not promptContent.strip(): raise ValueError("Empty prompt content received for web search") import json try: promptData = json.loads(promptContent) except json.JSONDecodeError as e: logger.error(f"Failed to parse prompt content as JSON: {promptContent[:200]}") raise ValueError(f"Invalid JSON in prompt content: {str(e)}") # Create Pydantic model webSearchPrompt = AiCallPromptWebSearch(**promptData) # Convert ISO country code to country name for Tavily countryName = webSearchPrompt.country if countryName: countryName = self._convertIsoCodeToCountryName(countryName) # Perform search - use exact parameters from prompt # NOTE: timeRange parameter causes generic results, so we don't use it searchResults = await self._search( query=webSearchPrompt.instruction, maxResults=webSearchPrompt.maxNumberPages, timeRange=None, # Not used - causes generic results country=countryName, includeAnswer="basic", includeRawContent="text" ) # Extract URLs and content from results with error handling urls = [] results_with_content = [] content_count = 0 try: for result in searchResults: try: # Safely extract URL url = result.url if hasattr(result, 'url') and result.url else "" if url: urls.append(url) # Safely extract content content = "" if hasattr(result, 'rawContent'): content = result.rawContent or "" if not content and hasattr(result, 'content'): content = result.content or "" if content: content_count += 1 # Safely extract title title = result.title if hasattr(result, 'title') and result.title else "" results_with_content.append({ "url": url, "title": title, "content": content, "score": getattr(result, 'score', 0) }) except Exception as resultError: logger.warning(f"Error processing individual search result: {resultError}") # Continue processing other results continue logger.info(f"Tavily search: {len(urls)} URLs, {content_count} with content, {len(results_with_content)} total results") if content_count == 0: logger.warning("Tavily search returned no content - results may need crawling") except Exception as extractionError: logger.error(f"Error extracting URLs and content from search results: {extractionError}") # Try to recover at least URLs try: urls = [result.url for result in searchResults if hasattr(result, 'url') and result.url] logger.info(f"Recovered {len(urls)} URLs after extraction error") except Exception: logger.error("Failed to recover any URLs from search results") # Return both URLs and full results in JSON for direct extraction # Format: {"urls": [...], "results": [...]} import json response_data = { "urls": urls, "results": results_with_content } return AiModelResponse( content=json.dumps(response_data, indent=2), success=True, metadata={ "total_urls": len(urls), "operation": "WEB_SEARCH_DATA", "results_with_content": results_with_content # Also in metadata for compatibility } ) except Exception as e: logger.error(f"Error in Tavily web search: {str(e)}", exc_info=True) import json # Return error response with empty results error_response = { "urls": [], "results": [], "error": str(e) } return AiModelResponse( content=json.dumps(error_response, indent=2), success=False, error=str(e) ) async def webCrawl(self, modelCall: AiModelCall) -> "AiModelResponse": """ WEB_CRAWL operation - crawls one URL using Tavily with link following. Args: modelCall: AiModelCall with AiCallPromptWebCrawl as prompt Returns: AiModelResponse with crawl results as JSON (may include multiple pages) """ try: # Extract parameters - find user message (not system message) promptContent = "" if modelCall.messages: for msg in modelCall.messages: if msg.get("role") == "user": promptContent = msg.get("content", "") break # Fallback to first message if no user message found if not promptContent and len(modelCall.messages) > 0: promptContent = modelCall.messages[0].get("content", "") if not promptContent or not promptContent.strip(): raise ValueError("Empty prompt content received for web crawl") import json try: promptData = json.loads(promptContent) except json.JSONDecodeError as e: logger.error(f"Failed to parse prompt content as JSON: {promptContent[:200]}") raise ValueError(f"Invalid JSON in prompt content: {str(e)}") # Create Pydantic model webCrawlPrompt = AiCallPromptWebCrawl(**promptData) # Perform crawl for ONE URL with link following # Use maxWidth as limit, maxDepth as maxDepth, and calculate maxBreadth crawlResults = await self._crawl( url=webCrawlPrompt.url, instructions=webCrawlPrompt.instruction, limit=webCrawlPrompt.maxWidth or 20, # maxWidth controls number of pages maxDepth=webCrawlPrompt.maxDepth or 2, maxBreadth=webCrawlPrompt.maxWidth or 40 # Use same as limit for breadth ) # Format multiple pages from the crawl into a single response if crawlResults and len(crawlResults) > 0: # Get all pages content with error handling allContent = "" pageUrls = [] for i, result in enumerate(crawlResults, 1): try: pageHeader = f"\n{'='*60}\nPAGE {i}: {result.url}\n{'='*60}\n" if result.title: allContent += f"{pageHeader}Title: {result.title}\n\n" else: allContent += f"{pageHeader}\n" allContent += f"{result.content or ''}\n" pageUrls.append(result.url) except Exception as pageError: logger.warning(f"Error formatting page {i} from crawl: {pageError}") # Try to add at least the URL try: pageUrls.append(result.url if hasattr(result, 'url') and result.url else webCrawlPrompt.url) except Exception: pass resultData = { "url": webCrawlPrompt.url, "title": crawlResults[0].title if crawlResults and crawlResults[0].title else "Content", "content": allContent, "pagesCrawled": len(crawlResults), "pageUrls": pageUrls } logger.info(f"Crawl successful: {len(crawlResults)} pages extracted from {webCrawlPrompt.url}") else: logger.warning(f"Crawl returned no results for URL: {webCrawlPrompt.url}") resultData = { "url": webCrawlPrompt.url, "title": "", "content": "", "error": "No content extracted - Tavily crawl returned 0 pages", "pagesCrawled": 0, "pageUrls": [] } # Return as JSON - same format as Perplexity but with multiple pages content import json return AiModelResponse( content=json.dumps(resultData, indent=2), success=True, metadata={"operation": "WEB_CRAWL", "url": webCrawlPrompt.url, "pagesCrawled": len(crawlResults) if crawlResults else 0} ) except Exception as e: logger.error(f"Error in Tavily web crawl: {str(e)}", exc_info=True) import json crawl_url = webCrawlPrompt.url if 'webCrawlPrompt' in locals() else "" errorResult = { "url": crawl_url, "title": "", "content": "", "error": str(e), "pagesCrawled": 0, "pageUrls": [] } return AiModelResponse( content=json.dumps(errorResult, indent=2), success=False, error=str(e) )