"""Tavily web search class.""" import logging import os from dataclasses import dataclass from modules.interfaces.interface_web_model import ( WebCrawlBase, WebCrawlDocumentData, WebCrawlRequest, WebCrawlResultItem, WebScrapeActionDocument, WebScrapeActionResult, WebScrapeBase, WebScrapeDocumentData, WebScrapeRequest, WebScrapeResultItem, WebSearchBase, WebSearchRequest, WebSearchActionResult, WebSearchActionDocument, WebSearchDocumentData, WebSearchResultItem, WebCrawlActionDocument, WebCrawlActionResult, ) # from modules.interfaces.interfaceChatModel import ActionResult, ActionDocument from tavily import AsyncTavilyClient from modules.shared.timezoneUtils import get_utc_timestamp logger = logging.getLogger(__name__) @dataclass class TavilySearchResult: title: str url: str @dataclass class TavilyCrawlResult: url: str content: str @dataclass class ConnectorTavily(WebSearchBase, WebCrawlBase, WebScrapeBase): client: AsyncTavilyClient = None @classmethod async def create(cls): return cls(client=AsyncTavilyClient(api_key=os.getenv("TAVILY_API_KEY"))) async def search_urls(self, request: WebSearchRequest) -> WebSearchActionResult: """Handles the web search request. Takes a query and returns a list of URLs. """ # Step 1: Search try: search_results = await self._search(request.query, request.max_results) except Exception as e: return WebSearchActionResult(success=False, error=str(e)) # Step 2: Build ActionResult try: result = self._build_search_action_result(search_results, request.query) except Exception as e: return WebSearchActionResult(success=False, error=str(e)) return result async def crawl_urls(self, request: WebCrawlRequest) -> WebCrawlActionResult: """Crawls the given URLs and returns the extracted text content.""" # Step 1: Crawl try: crawl_results = await self._crawl(request.urls) except Exception as e: return WebCrawlActionResult(success=False, error=str(e)) # Step 2: Build ActionResult try: result = self._build_crawl_action_result(crawl_results, request.urls) except Exception as e: return WebCrawlActionResult(success=False, error=str(e)) return result async def scrape(self, request: WebScrapeRequest) -> WebScrapeActionResult: """Turns a query in a list of urls with extracted content.""" # Step 1: Search try: search_results = await self._search(request.query, request.max_results) except Exception as e: return WebScrapeActionResult(success=False, error=str(e)) # Step 2: Crawl try: urls = [result.url for result in search_results] crawl_results = await self._crawl(urls) except Exception as e: return WebScrapeActionResult(success=False, error=str(e)) # Step 3: Build ActionResult try: result = self._build_scrape_action_result(crawl_results, request.query) except Exception as e: return WebScrapeActionResult(success=False, error=str(e)) return result async def _search(self, query: str, max_results: int) -> list[TavilySearchResult]: """Calls the Tavily API to perform a web search.""" # Make sure max_results is within the allowed range if max_results < 0 or max_results > 20: raise ValueError("max_results must be between 0 and 20") # Perform actual API call response = await self.client.search(query=query, max_results=max_results) logger.info(f"Tavily API response:\n{response}") return [ TavilySearchResult(title=result["title"], url=result["url"]) for result in response["results"] ] def _build_search_action_result( self, search_results: list[TavilySearchResult], query: str = "" ) -> WebSearchActionResult: """Builds the ActionResult from the search results.""" # Convert to result items result_items = [ WebSearchResultItem(title=result.title, url=result.url) for result in search_results ] # Create document data with all results document_data = WebSearchDocumentData( query=query, results=result_items, total_count=len(result_items) ) # Create single document document = WebSearchActionDocument( documentName=f"web_search_results_{get_utc_timestamp()}.json", documentData=document_data, mimeType="application/json", ) return WebSearchActionResult( success=True, documents=[document], resultLabel="web_search_results" ) async def _crawl(self, urls: list) -> list[TavilyCrawlResult]: """Calls the Tavily API to extract text content from URLs.""" response = await self.client.extract( urls=urls, extract_depth="advanced", format="text" ) return [ TavilyCrawlResult(url=result["url"], content=result["raw_content"]) for result in response["results"] ] def _build_crawl_action_result( self, crawl_results: list[TavilyCrawlResult], urls: list[str] = None ) -> WebCrawlActionResult: """Builds the ActionResult from the crawl results.""" # Convert to result items result_items = [ WebCrawlResultItem(url=result.url, content=result.content) for result in crawl_results ] # Create document data with all results document_data = WebCrawlDocumentData( urls=urls or [result.url for result in crawl_results], results=result_items, total_count=len(result_items), ) # Create single document document = WebCrawlActionDocument( documentName=f"web_crawl_results_{get_utc_timestamp()}.json", documentData=document_data, mimeType="application/json", ) return WebCrawlActionResult( success=True, documents=[document], resultLabel="web_crawl_results" ) def _build_scrape_action_result( self, crawl_results: list[TavilyCrawlResult], query: str = "" ) -> WebScrapeActionResult: """Builds the ActionResult from the scrape results.""" # Convert to result items result_items = [ WebScrapeResultItem(url=result.url, content=result.content) for result in crawl_results ] # Create document data with all results document_data = WebScrapeDocumentData( query=query, results=result_items, total_count=len(result_items), ) # Create single document document = WebScrapeActionDocument( documentName=f"web_scrape_results_{get_utc_timestamp()}.json", documentData=document_data, mimeType="application/json", ) return WebScrapeActionResult( success=True, documents=[document], resultLabel="web_scrape_results" )