"""Tavily web search class.""" import logging import os from dataclasses import dataclass from modules.interfaces.interfaceWebModel import ( WebCrawlBase, WebCrawlDocumentData, WebCrawlRequest, WebCrawlResultItem, WebScrapeActionDocument, WebScrapeActionResult, WebScrapeBase, WebScrapeDocumentData, WebScrapeRequest, WebScrapeResultItem, WebSearchBase, WebSearchRequest, WebSearchActionResult, WebSearchActionDocument, WebSearchDocumentData, WebSearchResultItem, WebCrawlActionDocument, WebCrawlActionResult, get_web_search_min_results, get_web_search_max_results, ) # from modules.interfaces.interfaceChatModel import ActionResult, ActionDocument from tavily import AsyncTavilyClient from modules.shared.timezoneUtils import get_utc_timestamp from modules.shared.configuration import APP_CONFIG logger = logging.getLogger(__name__) # Configuration loading functions def get_web_crawl_timeout() -> int: """Get web crawl timeout from configuration""" return int(APP_CONFIG.get("Web_Crawl_TIMEOUT", "30")) def get_web_crawl_max_retries() -> int: """Get web crawl max retries from configuration""" return int(APP_CONFIG.get("Web_Crawl_MAX_RETRIES", "3")) def get_web_crawl_retry_delay() -> int: """Get web crawl retry delay from configuration""" return int(APP_CONFIG.get("Web_Crawl_RETRY_DELAY", "2")) @dataclass class TavilySearchResult: title: str url: str @dataclass class TavilyCrawlResult: url: str content: str @dataclass class ConnectorTavily(WebSearchBase, WebCrawlBase, WebScrapeBase): client: AsyncTavilyClient = None @classmethod async def create(cls): api_key = APP_CONFIG.get("Connector_WebTavily_API_KEY") if not api_key: raise ValueError("Tavily API key not configured. Please set Connector_WebTavily_API_KEY in config.ini") return cls(client=AsyncTavilyClient(api_key=api_key)) async def search_urls(self, request: WebSearchRequest) -> WebSearchActionResult: """Handles the web search request. Takes a query and returns a list of URLs. """ # Step 1: Search try: search_results = await self._search(request.query, request.max_results) except Exception as e: return WebSearchActionResult(success=False, error=str(e)) # Step 2: Build ActionResult try: result = self._build_search_action_result(search_results, request.query) except Exception as e: return WebSearchActionResult(success=False, error=str(e)) return result async def crawl_urls(self, request: WebCrawlRequest) -> WebCrawlActionResult: """Crawls the given URLs and returns the extracted text content.""" # Step 1: Crawl try: crawl_results = await self._crawl(request.urls) except Exception as e: return WebCrawlActionResult(success=False, error=str(e)) # Step 2: Build ActionResult try: result = self._build_crawl_action_result(crawl_results, request.urls) except Exception as e: return WebCrawlActionResult(success=False, error=str(e)) return result async def scrape(self, request: WebScrapeRequest) -> WebScrapeActionResult: """Turns a query in a list of urls with extracted content.""" # Step 1: Search try: search_results = await self._search(request.query, request.max_results) except Exception as e: return WebScrapeActionResult(success=False, error=str(e)) # Step 2: Crawl try: urls = [result.url for result in search_results] crawl_results = await self._crawl(urls) except Exception as e: return WebScrapeActionResult(success=False, error=str(e)) # Step 3: Build ActionResult try: result = self._build_scrape_action_result(crawl_results, request.query) except Exception as e: return WebScrapeActionResult(success=False, error=str(e)) return result async def _search(self, query: str, max_results: int) -> list[TavilySearchResult]: """Calls the Tavily API to perform a web search.""" # Make sure max_results is within the allowed range min_results = get_web_search_min_results() max_allowed_results = get_web_search_max_results() if max_results < min_results or max_results > max_allowed_results: raise ValueError(f"max_results must be between {min_results} and {max_allowed_results}") # Perform actual API call response = await self.client.search(query=query, max_results=max_results) return [ TavilySearchResult(title=result["title"], url=result["url"]) for result in response["results"] ] def _build_search_action_result( self, search_results: list[TavilySearchResult], query: str = "" ) -> WebSearchActionResult: """Builds the ActionResult from the search results.""" # Convert to result items result_items = [ WebSearchResultItem(title=result.title, url=result.url) for result in search_results ] # Create document data with all results document_data = WebSearchDocumentData( query=query, results=result_items, total_count=len(result_items) ) # Create single document document = WebSearchActionDocument( documentName=f"web_search_results_{get_utc_timestamp()}.json", documentData=document_data, mimeType="application/json", ) return WebSearchActionResult( success=True, documents=[document], resultLabel="web_search_results" ) async def _crawl(self, urls: list) -> list[TavilyCrawlResult]: """Calls the Tavily API to extract text content from URLs with retry logic.""" import asyncio max_retries = get_web_crawl_max_retries() retry_delay = get_web_crawl_retry_delay() timeout = get_web_crawl_timeout() for attempt in range(max_retries + 1): try: # Use asyncio.wait_for for timeout response = await asyncio.wait_for( self.client.extract(urls=urls, extract_depth="advanced", format="text"), timeout=timeout ) return [ TavilyCrawlResult(url=result["url"], content=result["raw_content"]) for result in response["results"] ] except asyncio.TimeoutError: logger.warning(f"Crawl attempt {attempt + 1} timed out after {timeout} seconds") if attempt < max_retries: logger.info(f"Retrying in {retry_delay} seconds...") await asyncio.sleep(retry_delay) else: raise Exception(f"Crawl failed after {max_retries + 1} attempts due to timeout") except Exception as e: logger.warning(f"Crawl attempt {attempt + 1} failed: {str(e)}") if attempt < max_retries: logger.info(f"Retrying in {retry_delay} seconds...") await asyncio.sleep(retry_delay) else: raise Exception(f"Crawl failed after {max_retries + 1} attempts: {str(e)}") def _build_crawl_action_result( self, crawl_results: list[TavilyCrawlResult], urls: list[str] = None ) -> WebCrawlActionResult: """Builds the ActionResult from the crawl results.""" # Convert to result items result_items = [ WebCrawlResultItem(url=result.url, content=result.content) for result in crawl_results ] # Create document data with all results document_data = WebCrawlDocumentData( urls=urls or [result.url for result in crawl_results], results=result_items, total_count=len(result_items), ) # Create single document document = WebCrawlActionDocument( documentName=f"web_crawl_results_{get_utc_timestamp()}.json", documentData=document_data, mimeType="application/json", ) return WebCrawlActionResult( success=True, documents=[document], resultLabel="web_crawl_results" ) def _build_scrape_action_result( self, crawl_results: list[TavilyCrawlResult], query: str = "" ) -> WebScrapeActionResult: """Builds the ActionResult from the scrape results.""" # Convert to result items result_items = [ WebScrapeResultItem(url=result.url, content=result.content) for result in crawl_results ] # Create document data with all results document_data = WebScrapeDocumentData( query=query, results=result_items, total_count=len(result_items), ) # Create single document document = WebScrapeActionDocument( documentName=f"web_scrape_results_{get_utc_timestamp()}.json", documentData=document_data, mimeType="application/json", ) return WebScrapeActionResult( success=True, documents=[document], resultLabel="web_scrape_results" )