diff --git a/modules/aicore/aicorePluginPerplexity.py b/modules/aicore/aicorePluginPerplexity.py index 01fde7df..23bffc48 100644 --- a/modules/aicore/aicorePluginPerplexity.py +++ b/modules/aicore/aicorePluginPerplexity.py @@ -4,7 +4,8 @@ from typing import List from fastapi import HTTPException from modules.shared.configuration import APP_CONFIG from modules.aicore.aicoreBase import BaseConnectorAi -from modules.datamodels.datamodelAi import AiModel, PriorityEnum, ProcessingModeEnum, OperationTypeEnum, AiModelCall, AiModelResponse, createOperationTypeRatings +from modules.datamodels.datamodelAi import AiModel, PriorityEnum, ProcessingModeEnum, OperationTypeEnum, AiModelCall, AiModelResponse, createOperationTypeRatings, AiCallPromptWebSearch, AiCallPromptWebCrawl +from modules.datamodels.datamodelTools import CountryCodes # Configure logger logger = logging.getLogger(__name__) @@ -40,6 +41,13 @@ class AiPerplexity(BaseConnectorAi): """Get the connector type identifier.""" return "perplexity" + def _convertIsoCodeToCountryName(self, isoCode: str) -> str: + """ + Convert ISO-2 country code to Perplexity country name. + Uses centralized CountryCodes mapping. + """ + return CountryCodes.getForPerplexity(isoCode) + def getModels(self) -> List[AiModel]: """Get all available Perplexity models.""" return [ @@ -56,15 +64,12 @@ class AiPerplexity(BaseConnectorAi): speedRating=8, qualityRating=8, # capabilities removed (not used in business logic) - functionCall=self.callWebOperation, + functionCall=self._routeWebOperation, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.ADVANCED, operationTypes=createOperationTypeRatings( - (OperationTypeEnum.WEB_RESEARCH, 8), (OperationTypeEnum.WEB_SEARCH, 9), - (OperationTypeEnum.WEB_CRAWL, 7), - (OperationTypeEnum.WEB_NEWS, 8), - (OperationTypeEnum.WEB_QUESTIONS, 9) + (OperationTypeEnum.WEB_CRAWL, 7) ), version="sonar", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.005 + (bytesReceived / 4 / 1000) * 0.005 @@ -82,15 +87,12 @@ class AiPerplexity(BaseConnectorAi): speedRating=6, # Slower due to AI analysis qualityRating=10, # Best AI analysis quality # capabilities removed (not used in business logic) - functionCall=self.callWebOperation, + functionCall=self._routeWebOperation, priority=PriorityEnum.QUALITY, processingMode=ProcessingModeEnum.DETAILED, operationTypes=createOperationTypeRatings( - (OperationTypeEnum.WEB_RESEARCH, 10), (OperationTypeEnum.WEB_SEARCH, 9), - (OperationTypeEnum.WEB_CRAWL, 8), - (OperationTypeEnum.WEB_NEWS, 8), - (OperationTypeEnum.WEB_QUESTIONS, 9) + (OperationTypeEnum.WEB_CRAWL, 8) ), version="sonar-pro", calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.01 + (bytesReceived / 4 / 1000) * 0.01 @@ -133,747 +135,43 @@ class AiPerplexity(BaseConnectorAi): ) if response.status_code != 200: - error_detail = f"Perplexity API error: {response.status_code} - {response.text}" - logger.error(error_detail) + errorDetail = f"Perplexity API error: {response.status_code} - {response.text}" + logger.error(errorDetail) # Provide more specific error messages based on status code if response.status_code == 429: - error_message = "Rate limit exceeded. Please wait before making another request." + errorMessage = "Rate limit exceeded. Please wait before making another request." elif response.status_code == 401: - error_message = "Invalid API key. Please check your Perplexity API configuration." + errorMessage = "Invalid API key. Please check your Perplexity API configuration." elif response.status_code == 400: - error_message = f"Invalid request to Perplexity API: {response.text}" + errorMessage = f"Invalid request to Perplexity API: {response.text}" else: - error_message = f"Perplexity API error ({response.status_code}): {response.text}" + errorMessage = f"Perplexity API error ({response.status_code}): {response.text}" - raise HTTPException(status_code=500, detail=error_message) + raise HTTPException(status_code=500, detail=errorMessage) - responseJson = response.json() - content = responseJson["choices"][0]["message"]["content"] + apiResponse = response.json() + content = apiResponse["choices"][0]["message"]["content"] return AiModelResponse( content=content, success=True, modelId=model.name, - metadata={"response_id": responseJson.get("id", "")} + metadata={"response_id": apiResponse.get("id", "")} ) except Exception as e: logger.error(f"Error calling Perplexity API: {str(e)}") raise HTTPException(status_code=500, detail=f"Error calling Perplexity API: {str(e)}") - async def callAiWithWebSearch(self, modelCall: AiModelCall) -> AiModelResponse: - """ - Calls Perplexity API with web search capabilities for research using standardized pattern. - - Args: - modelCall: AiModelCall with messages and options - - Returns: - AiModelResponse with content and metadata - """ - try: - # Extract parameters from modelCall - messages = modelCall.messages - model = modelCall.model - options = modelCall.options - temperature = getattr(options, "temperature", None) - if temperature is None: - temperature = model.temperature - maxTokens = model.maxTokens - - # Parse unified prompt JSON format - promptContent = messages[0]["content"] if messages else "" - import json - promptData = json.loads(promptContent) - - # Create a more specific prompt for Perplexity based on the unified format - searchPrompt = promptData.get("searchPrompt", promptContent) - maxResults = promptData.get("maxResults", 5) - timeRange = promptData.get("timeRange") - country = promptData.get("country") - language = promptData.get("language") - - # Create enhanced prompt for Perplexity - enhancedPrompt = f"""Search the web for: {searchPrompt} -Please provide a comprehensive response with relevant URLs and information. -Focus on finding {maxResults} most relevant results. -{f"Limit results to the last {timeRange}" if timeRange else ""} -{f"Focus on {country}" if country else ""} -{f"Provide results in {language}" if language else ""} -Please format your response as a JSON object with the following structure: -{{ - "query": "{searchPrompt}", - "results": [ - {{ - "title": "Result title", - "url": "https://example.com", - "content": "Brief description or excerpt" - }} - ], - "total_count": number_of_results -}} -Include actual URLs in your response.""" - - # Update the messages with the enhanced prompt - enhancedMessages = [{"role": "user", "content": enhancedPrompt}] - - payload = { - "model": model.name, - "messages": enhancedMessages, - "temperature": temperature, - "max_tokens": maxTokens - } - - response = await self.httpClient.post( - model.apiUrl, - json=payload - ) - - if response.status_code != 200: - error_detail = f"Perplexity Web Search API error: {response.status_code} - {response.text}" - logger.error(error_detail) - - if response.status_code == 429: - error_message = "Rate limit exceeded for web search. Please wait before making another request." - elif response.status_code == 401: - error_message = "Invalid API key for web search. Please check your Perplexity API configuration." - elif response.status_code == 400: - error_message = f"Invalid request to Perplexity Web Search API: {response.text}" - else: - error_message = f"Perplexity Web Search API error ({response.status_code}): {response.text}" - - raise HTTPException(status_code=500, detail=error_message) - - responseJson = response.json() - content = responseJson["choices"][0]["message"]["content"] - - return AiModelResponse( - content=content, - success=True, - modelId=model.name, - metadata={"response_id": responseJson.get("id", "")} - ) - - except Exception as e: - logger.error(f"Error calling Perplexity Web Search API: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error calling Perplexity Web Search API: {str(e)}") - - async def researchTopic(self, modelCall: AiModelCall) -> AiModelResponse: - """ - Research a topic using Perplexity's web search capabilities using standardized pattern. - - Args: - modelCall: AiModelCall with messages and options - - Returns: - AiModelResponse with research content - """ - try: - # Extract parameters from modelCall - messages = modelCall.messages - model = modelCall.model - options = modelCall.options - temperature = getattr(options, "temperature", None) - if temperature is None: - temperature = model.temperature - maxTokens = model.maxTokens - - payload = { - "model": model.name, - "messages": messages, - "temperature": temperature, - "max_tokens": maxTokens - } - - response = await self.httpClient.post( - model.apiUrl, - json=payload - ) - - if response.status_code != 200: - error_detail = f"Perplexity Research API error: {response.status_code} - {response.text}" - logger.error(error_detail) - - if response.status_code == 429: - error_message = "Rate limit exceeded for research. Please wait before making another request." - elif response.status_code == 401: - error_message = "Invalid API key for research. Please check your Perplexity API configuration." - elif response.status_code == 400: - error_message = f"Invalid request to Perplexity Research API: {response.text}" - else: - error_message = f"Perplexity Research API error ({response.status_code}): {response.text}" - - raise HTTPException(status_code=500, detail=error_message) - - responseJson = response.json() - content = responseJson["choices"][0]["message"]["content"] - - return AiModelResponse( - content=content, - success=True, - modelId=model.name, - metadata={"response_id": responseJson.get("id", "")} - ) - - except Exception as e: - logger.error(f"Error researching topic: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error researching topic: {str(e)}") - - async def answerQuestion(self, modelCall: AiModelCall) -> AiModelResponse: - """ - Answer a question using web search for current information using standardized pattern. - - Args: - modelCall: AiModelCall with messages and options - - Returns: - AiModelResponse with answer content - """ - try: - # Extract parameters from modelCall - messages = modelCall.messages - model = modelCall.model - options = modelCall.options - temperature = getattr(options, "temperature", None) - if temperature is None: - temperature = model.temperature - maxTokens = model.maxTokens - - payload = { - "model": model.name, - "messages": messages, - "temperature": temperature, - "max_tokens": maxTokens - } - - response = await self.httpClient.post( - model.apiUrl, - json=payload - ) - - if response.status_code != 200: - error_detail = f"Perplexity Q&A API error: {response.status_code} - {response.text}" - logger.error(error_detail) - - if response.status_code == 429: - error_message = "Rate limit exceeded for Q&A. Please wait before making another request." - elif response.status_code == 401: - error_message = "Invalid API key for Q&A. Please check your Perplexity API configuration." - elif response.status_code == 400: - error_message = f"Invalid request to Perplexity Q&A API: {response.text}" - else: - error_message = f"Perplexity Q&A API error ({response.status_code}): {response.text}" - - raise HTTPException(status_code=500, detail=error_message) - - responseJson = response.json() - content = responseJson["choices"][0]["message"]["content"] - - return AiModelResponse( - content=content, - success=True, - modelId=model.name, - metadata={"response_id": responseJson.get("id", "")} - ) - - except Exception as e: - logger.error(f"Error answering question: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error answering question: {str(e)}") - - async def getCurrentNews(self, modelCall: AiModelCall) -> AiModelResponse: - """ - Get current news on a specific topic using standardized pattern. - - Args: - modelCall: AiModelCall with messages and options - - Returns: - AiModelResponse with news content - """ - try: - # Extract parameters from modelCall - messages = modelCall.messages - model = modelCall.model - options = modelCall.options - temperature = getattr(options, "temperature", None) - if temperature is None: - temperature = model.temperature - maxTokens = model.maxTokens - - payload = { - "model": model.name, - "messages": messages, - "temperature": temperature, - "max_tokens": maxTokens - } - - response = await self.httpClient.post( - model.apiUrl, - json=payload - ) - - if response.status_code != 200: - error_detail = f"Perplexity News API error: {response.status_code} - {response.text}" - logger.error(error_detail) - - if response.status_code == 429: - error_message = "Rate limit exceeded for news. Please wait before making another request." - elif response.status_code == 401: - error_message = "Invalid API key for news. Please check your Perplexity API configuration." - elif response.status_code == 400: - error_message = f"Invalid request to Perplexity News API: {response.text}" - else: - error_message = f"Perplexity News API error ({response.status_code}): {response.text}" - - raise HTTPException(status_code=500, detail=error_message) - - responseJson = response.json() - content = responseJson["choices"][0]["message"]["content"] - - return AiModelResponse( - content=content, - success=True, - modelId=model.name, - metadata={"response_id": responseJson.get("id", "")} - ) - - except Exception as e: - logger.error(f"Error getting current news: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error getting current news: {str(e)}") - - async def crawl(self, modelCall: AiModelCall) -> AiModelResponse: - """ - Crawl URLs using Perplexity's web search capabilities for content extraction. - - Args: - modelCall: AiModelCall with messages and options - - Returns: - AiModelResponse with content and metadata - """ - try: - # Extract parameters from modelCall - messages = modelCall.messages - model = modelCall.model - options = modelCall.options - temperature = getattr(options, "temperature", None) - if temperature is None: - temperature = model.temperature - maxTokens = model.maxTokens - - # Parse unified prompt JSON format - promptContent = messages[0]["content"] if messages else "" - import json - promptData = json.loads(promptContent) - - # Extract parameters from unified prompt JSON - urls = promptData.get("urls", []) - extractDepth = promptData.get("extractDepth", "advanced") - formatType = promptData.get("format", "markdown") - - if not urls: - return AiModelResponse( - content="No URLs provided for crawling", - success=False, - error="No URLs found in prompt data" - ) - - # Create enhanced prompt for Perplexity to crawl URLs - urlsList = ", ".join(urls) - enhancedPrompt = f"""Please extract and analyze content from these URLs: {urlsList} -Extraction requirements: -- Extract depth: {extractDepth} -- Output format: {formatType} -- Focus on main content, not navigation or ads -- Preserve important structure and formatting - -Please format your response as a JSON object with the following structure: -{{ - "urls": {json.dumps(urls)}, - "results": [ - {{ - "url": "https://example.com", - "title": "Page title", - "content": "Extracted content in {formatType} format", - "extractedAt": "2024-01-01T00:00:00Z" - }} - ], - "total_count": number_of_urls_processed -}} - -Extract content from each URL and provide detailed analysis.""" - - # Update the messages with the enhanced prompt - enhancedMessages = [{"role": "user", "content": enhancedPrompt}] - - payload = { - "model": model.name, - "messages": enhancedMessages, - "temperature": temperature, - "max_tokens": maxTokens - } - - response = await self.httpClient.post( - model.apiUrl, - json=payload - ) - - if response.status_code != 200: - error_detail = f"Perplexity Crawl API error: {response.status_code} - {response.text}" - logger.error(error_detail) - - if response.status_code == 429: - error_message = "Rate limit exceeded for crawl. Please wait before making another request." - elif response.status_code == 401: - error_message = "Invalid API key for crawl. Please check your Perplexity API configuration." - elif response.status_code == 400: - error_message = f"Invalid request to Perplexity Crawl API: {response.text}" - else: - error_message = f"Perplexity Crawl API error ({response.status_code}): {response.text}" - - raise HTTPException(status_code=500, detail=error_message) - - responseJson = response.json() - content = responseJson["choices"][0]["message"]["content"] - - return AiModelResponse( - content=content, - success=True, - modelId=model.name, - metadata={"response_id": responseJson.get("id", "")} - ) - - except Exception as e: - logger.error(f"Error calling Perplexity Crawl API: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error calling Perplexity Crawl API: {str(e)}") - - async def callWebOperation(self, modelCall: AiModelCall) -> AiModelResponse: - """ - Universal web operation handler that distributes to the correct method - based on the operationType from AiCallOptions. - """ - try: - options = modelCall.options - operationType = getattr(options, "operationType", None) - - if operationType == OperationTypeEnum.WEB_SEARCH: - return await self.callAiWithWebSearch(modelCall) - elif operationType == OperationTypeEnum.WEB_CRAWL: - return await self.crawl(modelCall) - elif operationType == OperationTypeEnum.WEB_RESEARCH: - return await self.research(modelCall) - elif operationType == OperationTypeEnum.WEB_QUESTIONS: - return await self.questions(modelCall) - elif operationType == OperationTypeEnum.WEB_NEWS: - return await self.news(modelCall) - else: - # Fallback to research for unknown operation types - return await self.research(modelCall) - - except Exception as e: - return AiModelResponse( - content="", - success=False, - error=str(e) - ) - - async def research(self, modelCall: AiModelCall) -> AiModelResponse: - """ - Research topics using Perplexity's web search capabilities. - - Args: - modelCall: AiModelCall with messages and options - - Returns: - AiModelResponse with research content and metadata - """ - try: - # Extract parameters from modelCall - messages = modelCall.messages - model = modelCall.model - options = modelCall.options - temperature = getattr(options, "temperature", None) - if temperature is None: - temperature = model.temperature - maxTokens = model.maxTokens - - # Parse unified prompt JSON format - promptContent = messages[0]["content"] if messages else "" - import json - promptData = json.loads(promptContent) - - # Extract parameters from unified prompt JSON - researchPrompt = promptData.get("researchPrompt", promptContent) - maxResults = promptData.get("maxResults", 8) - timeRange = promptData.get("timeRange") - country = promptData.get("country") - language = promptData.get("language") - - # Create enhanced prompt for research - enhancedPrompt = f"""Conduct comprehensive research on: {researchPrompt} - -Research requirements: -- Provide detailed analysis and insights -- Include multiple perspectives and sources -- Focus on finding {maxResults} most relevant sources -{f"Limit results to the last {timeRange}" if timeRange else ""} -{f"Focus on {country}" if country else ""} -{f"Provide results in {language}" if language else ""} - -Please format your response as a JSON object with the following structure: -{{ - "query": "{researchPrompt}", - "research_results": [ - {{ - "title": "Source title", - "url": "https://example.com", - "summary": "Brief summary", - "content": "Detailed content", - "extractedAt": "2024-01-01T00:00:00Z" - }} - ], - "total_count": number_of_sources, - "operation_type": "research" -}} - -Provide comprehensive research with detailed analysis.""" - - # Update the messages with the enhanced prompt - enhancedMessages = [{"role": "user", "content": enhancedPrompt}] - - payload = { - "model": model.name, - "messages": enhancedMessages, - "temperature": temperature, - "max_tokens": maxTokens - } - - response = await self.httpClient.post( - model.apiUrl, - json=payload - ) - - if response.status_code != 200: - error_detail = f"Perplexity Research API error: {response.status_code} - {response.text}" - logger.error(error_detail) - raise HTTPException(status_code=500, detail=error_detail) - - responseJson = response.json() - content = responseJson["choices"][0]["message"]["content"] - - return AiModelResponse( - content=content, - success=True, - modelId=model.name, - metadata={"response_id": responseJson.get("id", "")} - ) - - except Exception as e: - logger.error(f"Error calling Perplexity Research API: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error calling Perplexity Research API: {str(e)}") - - async def questions(self, modelCall: AiModelCall) -> AiModelResponse: - """ - Answer questions using Perplexity's web search capabilities. - - Args: - modelCall: AiModelCall with messages and options - - Returns: - AiModelResponse with answer and supporting sources - """ - try: - # Extract parameters from modelCall - messages = modelCall.messages - model = modelCall.model - options = modelCall.options - temperature = getattr(options, "temperature", None) - if temperature is None: - temperature = model.temperature - maxTokens = model.maxTokens - - # Parse unified prompt JSON format - promptContent = messages[0]["content"] if messages else "" - import json - promptData = json.loads(promptContent) - - # Extract parameters from unified prompt JSON - question = promptData.get("question", promptContent) - context = promptData.get("context", "") - maxResults = promptData.get("maxResults", 6) - timeRange = promptData.get("timeRange") - country = promptData.get("country") - language = promptData.get("language") - - # Create enhanced prompt for questions - contextText = f"\nAdditional context: {context}" if context else "" - enhancedPrompt = f"""Answer this question using web research: {question}{contextText} - -Answer requirements: -- Provide a comprehensive answer with supporting evidence -- Include {maxResults} most relevant sources -- Cite sources with URLs -{f"Focus on recent information (last {timeRange})" if timeRange else ""} -{f"Focus on {country}" if country else ""} -{f"Provide answer in {language}" if language else ""} - -Please format your response as a JSON object with the following structure: -{{ - "question": "{question}", - "answer": "Comprehensive answer to the question", - "answer_sources": [ - {{ - "title": "Source title", - "url": "https://example.com", - "summary": "Brief summary", - "content": "Relevant content excerpt", - "relevance": "Why this source is relevant" - }} - ], - "total_count": number_of_sources, - "operation_type": "questions" -}} - -Provide a detailed answer with well-cited sources.""" - - # Update the messages with the enhanced prompt - enhancedMessages = [{"role": "user", "content": enhancedPrompt}] - - payload = { - "model": model.name, - "messages": enhancedMessages, - "temperature": temperature, - "max_tokens": maxTokens - } - - response = await self.httpClient.post( - model.apiUrl, - json=payload - ) - - if response.status_code != 200: - error_detail = f"Perplexity Questions API error: {response.status_code} - {response.text}" - logger.error(error_detail) - raise HTTPException(status_code=500, detail=error_detail) - - responseJson = response.json() - content = responseJson["choices"][0]["message"]["content"] - - return AiModelResponse( - content=content, - success=True, - modelId=model.name, - metadata={"response_id": responseJson.get("id", "")} - ) - - except Exception as e: - logger.error(f"Error calling Perplexity Questions API: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error calling Perplexity Questions API: {str(e)}") - - async def news(self, modelCall: AiModelCall) -> AiModelResponse: - """ - Search and analyze news using Perplexity's web search capabilities. - - Args: - modelCall: AiModelCall with messages and options - - Returns: - AiModelResponse with news articles and analysis - """ - try: - # Extract parameters from modelCall - messages = modelCall.messages - model = modelCall.model - options = modelCall.options - temperature = getattr(options, "temperature", None) - if temperature is None: - temperature = model.temperature - maxTokens = model.maxTokens - - # Parse unified prompt JSON format - promptContent = messages[0]["content"] if messages else "" - import json - promptData = json.loads(promptContent) - - # Extract parameters from unified prompt JSON - newsPrompt = promptData.get("newsPrompt", promptContent) - maxResults = promptData.get("maxResults", 10) - timeRange = promptData.get("timeRange", "w") # Default to week for news - country = promptData.get("country") - language = promptData.get("language") - - # Create enhanced prompt for news - enhancedPrompt = f"""Find and analyze recent news about: {newsPrompt} - -News requirements: -- Find {maxResults} most recent and relevant news articles -- Focus on the last {timeRange} (recent news) -- Include diverse sources and perspectives -{f"Focus on news from {country}" if country else ""} -{f"Provide news in {language}" if language else ""} - -Please format your response as a JSON object with the following structure: -{{ - "news_query": "{newsPrompt}", - "articles": [ - {{ - "title": "Article title", - "url": "https://example.com", - "content": "Article content", - "date": "2024-01-01", - "source": "News source name", - "summary": "Brief summary of the article" - }} - ], - "total_count": number_of_articles, - "operation_type": "news" -}} - -Provide comprehensive news coverage with analysis.""" - - # Update the messages with the enhanced prompt - enhancedMessages = [{"role": "user", "content": enhancedPrompt}] - - payload = { - "model": model.name, - "messages": enhancedMessages, - "temperature": temperature, - "max_tokens": maxTokens - } - - response = await self.httpClient.post( - model.apiUrl, - json=payload - ) - - if response.status_code != 200: - error_detail = f"Perplexity News API error: {response.status_code} - {response.text}" - logger.error(error_detail) - raise HTTPException(status_code=500, detail=error_detail) - - responseJson = response.json() - content = responseJson["choices"][0]["message"]["content"] - - return AiModelResponse( - content=content, - success=True, - modelId=model.name, - metadata={"response_id": responseJson.get("id", "")} - ) - - except Exception as e: - logger.error(f"Error calling Perplexity News API: {str(e)}") - raise HTTPException(status_code=500, detail=f"Error calling Perplexity News API: {str(e)}") - async def _testConnection(self) -> bool: """ Tests the connection to the Perplexity API. - + Returns: True if connection is successful, False otherwise """ @@ -883,9 +181,192 @@ Provide comprehensive news coverage with analysis.""" {"role": "user", "content": "Hello, please respond with just 'OK' to confirm the connection works."} ] - response = await self.callAiBasic(testMessages) - return response and len(response.strip()) > 0 + # Create a model call for testing + from modules.datamodels.datamodelAi import AiCallOptions + model = self.getModels()[0] # Get first model for testing + testCall = AiModelCall( + messages=testMessages, + model=model, + options=AiCallOptions() + ) + + response = await self.callAiBasic(testCall) + return response.success and len(response.content.strip()) > 0 except Exception as e: logger.error(f"Perplexity connection test failed: {str(e)}") return False + + async def _routeWebOperation(self, modelCall: AiModelCall) -> AiModelResponse: + """ + Route web operation based on operation type. + + Args: + modelCall: AiModelCall with messages and options + + Returns: + AiModelResponse based on operation type + """ + operationType = modelCall.options.operationType + + if operationType == OperationTypeEnum.WEB_SEARCH: + return await self.webSearch(modelCall) + elif operationType == OperationTypeEnum.WEB_CRAWL: + return await self.webCrawl(modelCall) + else: + # Fallback to basic call + return await self.callAiBasic(modelCall) + + async def webSearch(self, modelCall: AiModelCall) -> AiModelResponse: + """ + WEB_SEARCH operation - returns list of URLs based on search query. + + Args: + modelCall: AiModelCall with AiCallPromptWebSearch as prompt + + Returns: + AiModelResponse with JSON list of URLs + """ + try: + # Extract parameters + messages = modelCall.messages + model = modelCall.model + options = modelCall.options + temperature = getattr(options, "temperature", None) or model.temperature + maxTokens = model.maxTokens + + # Parse prompt JSON + promptContent = messages[0]["content"] if messages else "" + import json + promptData = json.loads(promptContent) + + # Create Pydantic model + webSearchPrompt = AiCallPromptWebSearch(**promptData) + + # Convert ISO country code to country name + countryName = webSearchPrompt.country + if countryName: + countryName = self._convertIsoCodeToCountryName(countryName) + + # Build search request for Perplexity + searchPrompt = f"""Search the web for: {webSearchPrompt.instruction} + +Return a JSON array of {webSearchPrompt.maxNumberPages} most relevant URLs. +{'' if not countryName else f'Focus on results from {countryName}.'} +{'' if not webSearchPrompt.timeRange else f'Limit to results from the last {webSearchPrompt.timeRange}'} +{'' if not webSearchPrompt.language else f'Return results in {webSearchPrompt.language} language'} + +Return ONLY a JSON array of URLs, no additional text: +[ + "https://example1.com/page", + "https://example2.com/article", + "https://example3.com/resource" +]""" + + payload = { + "model": model.name, + "messages": [{"role": "user", "content": searchPrompt}], + "temperature": temperature, + "max_tokens": maxTokens + } + + response = await self.httpClient.post(model.apiUrl, json=payload) + + if response.status_code != 200: + raise HTTPException(status_code=500, detail=f"Perplexity Web Search API error: {response.text}") + + apiResponse = response.json() + content = apiResponse["choices"][0]["message"]["content"] + + return AiModelResponse( + content=content, + success=True, + modelId=model.name, + metadata={"response_id": apiResponse.get("id", ""), "operation": "WEB_SEARCH"} + ) + + except Exception as e: + logger.error(f"Error in Perplexity web search: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error in Perplexity web search: {str(e)}") + + async def webCrawl(self, modelCall: AiModelCall) -> AiModelResponse: + """ + WEB_CRAWL operation - crawls ONE URL and returns content. + + Args: + modelCall: AiModelCall with AiCallPromptWebCrawl as prompt + + Returns: + AiModelResponse with crawl results as JSON object + """ + try: + # Extract parameters + messages = modelCall.messages + model = modelCall.model + options = modelCall.options + temperature = getattr(options, "temperature", None) or model.temperature + maxTokens = model.maxTokens + + # Parse prompt JSON + promptContent = messages[0]["content"] if messages else "" + import json + promptData = json.loads(promptContent) + + # Create Pydantic model + webCrawlPrompt = AiCallPromptWebCrawl(**promptData) + + # Build crawl request for Perplexity - ONE URL + crawlPrompt = f"""Crawl and extract content from this URL based on the instruction: + +INSTRUCTION: '{webCrawlPrompt.instruction}' + +URL to crawl (maxDepth={webCrawlPrompt.maxDepth}): +{webCrawlPrompt.url} + +Extract and return the relevant content based on the instruction. +Return as JSON object with this structure: +{{ + "url": "{webCrawlPrompt.url}", + "title": "Page title", + "content": "Extracted content relevant to the instruction" +}} + +Return ONLY valid JSON, no additional text.""" + + payload = { + "model": model.name, + "messages": [{"role": "user", "content": crawlPrompt}], + "temperature": temperature, + "max_tokens": maxTokens + } + + response = await self.httpClient.post(model.apiUrl, json=payload) + + if response.status_code != 200: + raise HTTPException(status_code=500, detail=f"Perplexity Web Crawl API error: {response.text}") + + apiResponse = response.json() + content = apiResponse["choices"][0]["message"]["content"] + + # Parse JSON content and ensure it's a single object + import json + try: + parsedContent = json.loads(content) + # Ensure it's a single object, not an array + if isinstance(parsedContent, list): + parsedContent = parsedContent[0] if parsedContent else {} + except: + # If not JSON, create structured response + parsedContent = {"url": webCrawlPrompt.url, "title": "", "content": content} + + # Return as JSON string + return AiModelResponse( + content=json.dumps(parsedContent, indent=2), + success=True, + modelId=model.name, + metadata={"response_id": apiResponse.get("id", ""), "operation": "WEB_CRAWL", "url": webCrawlPrompt.url} + ) + + except Exception as e: + logger.error(f"Error in Perplexity web crawl: {str(e)}") + raise HTTPException(status_code=500, detail=f"Error in Perplexity web crawl: {str(e)}") diff --git a/modules/aicore/aicorePluginTavily.py b/modules/aicore/aicorePluginTavily.py index f2cc0b6a..9320bba7 100644 --- a/modules/aicore/aicorePluginTavily.py +++ b/modules/aicore/aicorePluginTavily.py @@ -9,7 +9,8 @@ from typing import Optional, List, Dict from tavily import AsyncTavilyClient from modules.shared.configuration import APP_CONFIG from modules.aicore.aicoreBase import BaseConnectorAi -from modules.datamodels.datamodelAi import AiModel, PriorityEnum, ProcessingModeEnum, OperationTypeEnum, AiModelResponse, createOperationTypeRatings +from modules.datamodels.datamodelAi import AiModel, PriorityEnum, ProcessingModeEnum, OperationTypeEnum, AiModelCall, AiModelResponse, createOperationTypeRatings, AiCallPromptWebSearch, AiCallPromptWebCrawl +from modules.datamodels.datamodelTools import CountryCodes logger = logging.getLogger(__name__) @@ -18,38 +19,13 @@ logger = logging.getLogger(__name__) class WebSearchResult: title: str url: str - raw_content: Optional[str] = None + rawContent: Optional[str] = None @dataclass class WebCrawlResult: url: str content: str - -@dataclass -class WebResearchRequest: - """Ultra-simplified web research request""" - user_prompt: str - urls: Optional[List[str]] = None - max_results: int = 5 - max_pages: int = 10 - search_depth: str = "basic" - extract_depth: str = "advanced" - format: str = "markdown" - country: Optional[str] = None - time_range: Optional[str] = None - topic: Optional[str] = None - language: Optional[str] = None - -@dataclass -class WebResearchResult: - """Ultra-simplified web research result - just success/error + documents""" - success: bool = True - error: Optional[str] = None - documents: List[dict] = None # Simple dict instead of ActionDocument - - def __post_init__(self): - if self.documents is None: - self.documents = [] + title: Optional[str] = None class ConnectorWeb(BaseConnectorAi): """Tavily web search connector.""" @@ -70,9 +46,9 @@ class ConnectorWeb(BaseConnectorAi): def _initializeClient(self): """Initialize the Tavily client if API key is available.""" try: - api_key = APP_CONFIG.get("Connector_AiTavily_API_SECRET") - if api_key: - self.client = AsyncTavilyClient(api_key=api_key) + apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET") + if apiKey: + self.client = AsyncTavilyClient(api_key=apiKey) logger.info("Tavily client initialized successfully") else: logger.warning("Tavily API key not found, client not initialized") @@ -83,70 +59,31 @@ class ConnectorWeb(BaseConnectorAi): """Get the connector type identifier.""" return "tavily" + def _convertIsoCodeToCountryName(self, isoCode: str) -> str: + """ + Convert ISO-2 country code to Tavily country name. + Uses centralized CountryCodes mapping. + """ + return CountryCodes.getForTavily(isoCode) + def _extractUrlsFromPrompt(self, prompt: str) -> List[str]: """Extract URLs from a text prompt using regex.""" if not prompt: return [] # URL regex pattern - matches http/https URLs - url_pattern = r'https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?' - urls = re.findall(url_pattern, prompt) + urlPattern = r'https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?' + urls = re.findall(urlPattern, prompt) # Remove duplicates while preserving order seen = set() - unique_urls = [] + uniqueUrls = [] for url in urls: if url not in seen: seen.add(url) - unique_urls.append(url) + uniqueUrls.append(url) - return unique_urls - - def _intelligentUrlFiltering(self, searchResults: List[WebSearchResult], query: str, maxResults: int) -> List[WebSearchResult]: - """ - Intelligent URL filtering with de-duplication and relevance scoring. - - Args: - searchResults: Raw search results from Tavily - query: Original search query for relevance scoring - maxResults: Maximum number of results to return - - Returns: - Filtered and deduplicated list of search results - """ - if not searchResults: - return [] - - # Step 1: Basic de-duplication by URL - seenUrls = set() - uniqueResults = [] - - for result in searchResults: - # Normalize URL for better deduplication - normalizedUrl = self._normalizeUrl(result.url) - if normalizedUrl not in seenUrls: - seenUrls.add(normalizedUrl) - uniqueResults.append(result) - - logger.info(f"After basic deduplication: {len(uniqueResults)} unique URLs from {len(searchResults)} original") - - # Step 2: Relevance scoring and filtering - scoredResults = [] - queryWords = set(query.lower().split()) - - for result in uniqueResults: - score = self._calculateRelevanceScore(result, queryWords) - scoredResults.append((score, result)) - - # Step 3: Sort by relevance score (higher is better) - scoredResults.sort(key=lambda x: x[0], reverse=True) - - # Step 4: Take top results - filteredResults = [result for score, result in scoredResults[:maxResults]] - - logger.info(f"After intelligent filtering: {len(filteredResults)} results selected from {len(uniqueResults)} unique") - - return filteredResults + return uniqueUrls def _normalizeUrl(self, url: str) -> str: """ @@ -206,14 +143,14 @@ class ConnectorWeb(BaseConnectorAi): score += urlMatches * 1.5 # Content relevance (if available) - if hasattr(result, 'raw_content') and result.raw_content: - contentWords = set(result.raw_content.lower().split()) + if hasattr(result, 'rawContent') and result.rawContent: + contentWords = set(result.rawContent.lower().split()) contentMatches = len(queryWords.intersection(contentWords)) score += contentMatches * 0.1 # Lower weight for content matches # Domain authority bonus (simple heuristic) domain = result.url.split('/')[2] if '/' in result.url else result.url - if any(auth_domain in domain.lower() for auth_domain in + if any(authDomain in domain.lower() for authDomain in ['wikipedia.org', 'github.com', 'stackoverflow.com', 'reddit.com', 'medium.com']): score += 1.0 @@ -223,223 +160,91 @@ class ConnectorWeb(BaseConnectorAi): return score - async def _optimizeSearchQuery(self, query: str, timeRange: str = None, country: str = None, language: str = None) -> tuple[str, dict]: + def _intelligentUrlFiltering(self, searchResults: List[WebSearchResult], query: str, maxResults: int) -> List[WebSearchResult]: """ - Use AI to optimize search query and parameters (from old SubWebResearch). - - Args: - query: Original search query - timeRange: Time range filter - country: Country filter - language: Language filter - - Returns: - Tuple of (optimized_query, optimized_parameters) - """ - try: - # Create AI prompt for query optimization (from old code) - queryOptimizerPrompt = f"""You are a search query optimizer. - -USER QUERY: {query} - -Your task: Create a search query and parameters for the USER QUERY given. - -RULES: -1. The search query MUST be related to the user query above -2. Extract key terms from the user query -3. Determine appropriate country/language based on the query context -4. Keep search query short (2-6 words) - -Return ONLY this JSON format: -{{ - "user_prompt": "search query based on user query above", - "country": "Full English country name (ISO-3166; map codes via pycountry/i18n-iso-countries)", - "language": "language_code_or_null", - "topic": "general|news|academic_or_null", - "time_range": "d|w|m|y_or_null", - "selection_strategy": "single|multiple|specific_page", - "selection_criteria": "what URLs to prioritize", - "expected_url_patterns": ["pattern1", "pattern2"], - "estimated_result_count": number -}}""" - - # Use AI to optimize the query - from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions - aiRequest = AiCallRequest( - prompt=queryOptimizerPrompt, - options=AiCallOptions() - ) - - # Get AI response (this would need to be called through the AI interface) - # For now, return the original query with basic optimization - logger.info(f"AI query optimization requested for: '{query}'") - - # Basic optimization fallback - optimizedQuery = query - optimizedParams = { - "time_range": timeRange, - "country": country, - "language": language, - "topic": "general" - } - - return optimizedQuery, optimizedParams - - except Exception as e: - logger.warning(f"Query optimization failed: {str(e)}, using original query") - return query, {"time_range": timeRange, "country": country, "language": language} - - async def _aiBasedUrlSelection(self, searchResults: List[WebSearchResult], originalQuery: str, maxResults: int) -> List[WebSearchResult]: - """ - Use AI to select the most relevant URLs from search results (from old SubWebResearch). + Intelligent URL filtering with de-duplication and relevance scoring. Args: searchResults: Raw search results from Tavily - originalQuery: Original user query for context + query: Original search query for relevance scoring maxResults: Maximum number of results to return Returns: - AI-selected and filtered list of search results + Filtered and deduplicated list of search results """ - try: - if not searchResults: - return [] - - # Step 1: Basic de-duplication - seenUrls = set() - uniqueResults = [] - - for result in searchResults: - normalizedUrl = self._normalizeUrl(result.url) - if normalizedUrl not in seenUrls: - seenUrls.add(normalizedUrl) - uniqueResults.append(result) - - logger.info(f"After basic deduplication: {len(uniqueResults)} unique URLs from {len(searchResults)} original") - - if len(uniqueResults) <= maxResults: - return uniqueResults - - # Step 2: AI-based URL selection (from old code) - logger.info(f"AI selecting most relevant {maxResults} URLs from {len(uniqueResults)} unique results") - - # Create AI prompt for URL selection (from old code) - urlList = "\n".join([f"{i+1}. {result.url}" for i, result in enumerate(uniqueResults)]) - aiPrompt = f"""Select the most relevant URLs from these search results: - -{urlList} - -Return only the URLs that are most relevant for the user's query: "{originalQuery}" -One URL per line. -""" - - # For now, use intelligent filtering as fallback - # In a full implementation, this would call the AI interface - logger.info("Using intelligent filtering as AI selection fallback") - - # Use the existing intelligent filtering - filteredResults = self._intelligentUrlFiltering(uniqueResults, originalQuery, maxResults) - - logger.info(f"AI-based selection completed: {len(filteredResults)} results selected") - return filteredResults - - except Exception as e: - logger.warning(f"AI-based URL selection failed: {str(e)}, using intelligent filtering") - return self._intelligentUrlFiltering(searchResults, originalQuery, maxResults) + if not searchResults: + return [] + + # Step 1: Basic de-duplication by URL + seenUrls = set() + uniqueResults = [] + + for result in searchResults: + # Normalize URL for better deduplication + normalizedUrl = self._normalizeUrl(result.url) + if normalizedUrl not in seenUrls: + seenUrls.add(normalizedUrl) + uniqueResults.append(result) + + logger.info(f"After basic deduplication: {len(uniqueResults)} unique URLs from {len(searchResults)} original") + + # Step 2: Relevance scoring and filtering + scoredResults = [] + queryWords = set(query.lower().split()) + + for result in uniqueResults: + score = self._calculateRelevanceScore(result, queryWords) + scoredResults.append((score, result)) + + # Step 3: Sort by relevance score (higher is better) + scoredResults.sort(key=lambda x: x[0], reverse=True) + + # Step 4: Take top results + filteredResults = [result for score, result in scoredResults[:maxResults]] + + logger.info(f"After intelligent filtering: {len(filteredResults)} results selected from {len(uniqueResults)} unique") + + return filteredResults def getModels(self) -> List[AiModel]: """Get all available Tavily models.""" return [ AiModel( name="tavily-search", - displayName="Tavily Search", + displayName="Tavily Search & Research", connectorType="tavily", - apiUrl="https://api.tavily.com/search", + apiUrl="https://api.tavily.com", temperature=0.0, # Web search doesn't use temperature maxTokens=0, # Web search doesn't use tokens contextLength=0, costPer1kTokensInput=0.0, costPer1kTokensOutput=0.0, - speedRating=9, # Very fast for URL discovery - qualityRating=9, # Excellent URL discovery quality + speedRating=8, # Good speed for search and extract + qualityRating=9, # Excellent quality for web research # capabilities removed (not used in business logic) - functionCall=self.callWebOperation, + functionCall=self._routeWebOperation, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.BASIC, operationTypes=createOperationTypeRatings( - (OperationTypeEnum.WEB_SEARCH, 10), - (OperationTypeEnum.WEB_RESEARCH, 3), - (OperationTypeEnum.WEB_CRAWL, 2), - (OperationTypeEnum.WEB_NEWS, 3), - (OperationTypeEnum.WEB_QUESTIONS, 2) + (OperationTypeEnum.WEB_SEARCH, 9), + (OperationTypeEnum.WEB_CRAWL, 8) ), version="tavily-search", - calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived, searchDepth="basic", numRequests=1: numRequests * (1 if searchDepth == "basic" else 2) * 0.008 - ), - AiModel( - name="tavily-extract", - displayName="Tavily Extract", - connectorType="tavily", - apiUrl="https://api.tavily.com/extract", - temperature=0.0, # Web crawling doesn't use temperature - maxTokens=0, # Web crawling doesn't use tokens - contextLength=0, - costPer1kTokensInput=0.0, - costPer1kTokensOutput=0.0, - speedRating=7, # Good for content extraction - qualityRating=9, # Excellent content extraction quality - # capabilities removed (not used in business logic) - functionCall=self.callWebOperation, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.BASIC, - operationTypes=createOperationTypeRatings( - (OperationTypeEnum.WEB_CRAWL, 10), - (OperationTypeEnum.WEB_RESEARCH, 3), - (OperationTypeEnum.WEB_NEWS, 3), - (OperationTypeEnum.WEB_QUESTIONS, 2) - ), - version="tavily-extract", - calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived, numPages=10, extractionDepth="basic", withInstructions=False, numSuccessfulExtractions=10: ((numPages / 10) * (2 if withInstructions else 1) + (numSuccessfulExtractions / 5) * (1 if extractionDepth == "basic" else 2)) * 0.008 - ), - AiModel( - name="tavily-search-extract", - displayName="Tavily Search & Extract", - connectorType="tavily", - apiUrl="https://api.tavily.com/search", - temperature=0.0, # Web scraping doesn't use temperature - maxTokens=0, # Web scraping doesn't use tokens - contextLength=0, - costPer1kTokensInput=0.0, - costPer1kTokensOutput=0.0, - speedRating=7, # Good for combined search+extract - qualityRating=8, # Good quality for structured data - # capabilities removed (not used in business logic) - functionCall=self.callWebOperation, - priority=PriorityEnum.BALANCED, - processingMode=ProcessingModeEnum.BASIC, - operationTypes=createOperationTypeRatings( - (OperationTypeEnum.WEB_RESEARCH, 8), - (OperationTypeEnum.WEB_SEARCH, 6), - (OperationTypeEnum.WEB_CRAWL, 6), - (OperationTypeEnum.WEB_NEWS, 5), - (OperationTypeEnum.WEB_QUESTIONS, 5) - ), - version="tavily-search-extract", - calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived, searchDepth="basic", numSuccessfulUrls=1, extractionDepth="basic": ((1 if searchDepth == "basic" else 2) + (numSuccessfulUrls / 5) * (1 if extractionDepth == "basic" else 2)) * 0.008 + calculatePriceUsd=lambda processingTime, bytesSent, bytesReceived: 0.008 # Simple flat rate ) ] @classmethod async def create(cls): - api_key = APP_CONFIG.get("Connector_AiTavily_API_SECRET") - if not api_key: + apiKey = APP_CONFIG.get("Connector_AiTavily_API_SECRET") + if not apiKey: raise ValueError("Tavily API key not configured. Please set Connector_AiTavily_API_SECRET in config.ini") # Load and cache web crawl related configuration crawlTimeout = int(APP_CONFIG.get("Web_Crawl_TIMEOUT", "30")) crawlMaxRetries = int(APP_CONFIG.get("Web_Crawl_MAX_RETRIES", "3")) crawlRetryDelay = int(APP_CONFIG.get("Web_Crawl_RETRY_DELAY", "2")) return cls( - client=AsyncTavilyClient(api_key=api_key), + client=AsyncTavilyClient(api_key=apiKey), crawlTimeout=crawlTimeout, crawlMaxRetries=crawlMaxRetries, crawlRetryDelay=crawlRetryDelay, @@ -449,505 +254,58 @@ One URL per line. # Standardized method using AiModelCall/AiModelResponse pattern - async def callWebOperation(self, modelCall) -> "AiModelResponse": - """ - Universal web operation handler that distributes to the correct method - based on the operationType from AiCallOptions. - """ - try: - options = modelCall.options - operationType = getattr(options, "operationType", None) - - if operationType == OperationTypeEnum.WEB_SEARCH: - return await self.search(modelCall) - elif operationType == OperationTypeEnum.WEB_CRAWL: - return await self.crawl(modelCall) - elif operationType in [OperationTypeEnum.WEB_RESEARCH, OperationTypeEnum.WEB_QUESTIONS, OperationTypeEnum.WEB_NEWS]: - return await self.research(modelCall) - else: - # Fallback to search for unknown operation types - return await self.search(modelCall) - - except Exception as e: - return AiModelResponse( - content="", - success=False, - error=str(e) - ) - async def search(self, modelCall) -> "AiModelResponse": - """Search using standardized AiModelCall/AiModelResponse pattern""" - try: - # Extract parameters from modelCall - prompt_content = modelCall.messages[0]["content"] if modelCall.messages else "" - options = modelCall.options - - # Parse unified prompt JSON format - import json - promptData = json.loads(prompt_content) - - # Extract parameters from unified prompt JSON - query = promptData.get("searchPrompt", prompt_content) - maxResults = promptData.get("maxResults", 5) - timeRange = promptData.get("timeRange") - country = promptData.get("country") - language = promptData.get("language") - - # Use basic search depth for web search operations - searchDepth = "basic" - - # Step 1: AI Query Optimization (from old SubWebResearch) - optimizedQuery, optimizedParams = await self._optimizeSearchQuery(query, timeRange, country, language) - - # Step 2: Get more results than requested to allow for intelligent filtering - searchResults = await self._search( - query=optimizedQuery, - max_results=min(maxResults * 3, 30), # Get more results for better AI selection - search_depth=searchDepth, - time_range=optimizedParams.get("time_range", timeRange), - country=optimizedParams.get("country", country), - language=optimizedParams.get("language", language), - include_answer=getattr(options, "include_answer", True), - include_raw_content=getattr(options, "include_raw_content", True), - ) - - # Step 3: AI-based URL selection and intelligent filtering - filteredResults = await self._aiBasedUrlSelection(searchResults, query, maxResults) - - # Convert to JSON string - resultsJson = { - "query": query, - "results": [ - { - "title": result.title, - "url": result.url, - "content": getattr(result, 'raw_content', None) - } - for result in filteredResults - ], - "total_count": len(filteredResults), - "original_count": len(searchResults), - "filtered_count": len(searchResults) - len(filteredResults) - } - - import json - content = json.dumps(resultsJson, indent=2) - - return AiModelResponse( - content=content, - success=True, - metadata={ - "total_count": len(filteredResults), - "search_depth": searchDepth - } - ) - - except Exception as e: - return AiModelResponse( - content="", - success=False, - error=str(e) - ) - - async def crawl(self, modelCall) -> "AiModelResponse": - """Crawl using standardized AiModelCall/AiModelResponse pattern""" - try: - # Extract parameters from modelCall - promptContent = modelCall.messages[0]["content"] if modelCall.messages else "" - options = modelCall.options - - # Parse unified prompt JSON format - import json - promptData = json.loads(promptContent) - - # Extract parameters from unified prompt JSON - urls = promptData.get("urls", []) - extractDepth = promptData.get("extractDepth", "advanced") - formatType = promptData.get("format", "markdown") - - if not urls: - return AiModelResponse( - content="No URLs provided for crawling", - success=False, - error="No URLs found in prompt data" - ) - - rawResults = await self._crawl( - urls, - extract_depth=extractDepth, - format=formatType, - ) - - # Convert to JSON string - resultsJson = { - "urls": urls, - "results": [ - { - "url": result.url, - "title": getattr(result, 'title', ''), - "content": result.content, - "extractedAt": getattr(result, 'extracted_at', '') - } - for result in rawResults - ], - "total_count": len(rawResults) - } - - import json - content = json.dumps(resultsJson, indent=2) - - return AiModelResponse( - content=content, - success=True, - metadata={ - "total_count": len(rawResults), - "urls_processed": len(urls) - } - ) - - except Exception as e: - return AiModelResponse( - content="", - success=False, - error=str(e) - ) - - async def research(self, modelCall) -> "AiModelResponse": - """ - Handle WEB_RESEARCH, WEB_QUESTIONS, WEB_NEWS operations using search + crawl combination. - Single method for all three operation types with different standard settings. - """ - try: - # Extract parameters from modelCall - promptContent = modelCall.messages[0]["content"] if modelCall.messages else "" - options = modelCall.options - operationType = getattr(options, "operationType", None) - - # Parse unified prompt JSON format - import json - promptData = json.loads(promptContent) - - # Extract parameters based on operation type - if operationType == OperationTypeEnum.WEB_RESEARCH: - query = promptData.get("researchPrompt", promptContent) - maxResults = promptData.get("maxResults", 8) - searchDepth = "basic" - timeRange = promptData.get("timeRange") - country = promptData.get("country") - language = promptData.get("language") - topic = "general" - - elif operationType == OperationTypeEnum.WEB_QUESTIONS: - query = promptData.get("question", promptContent) - maxResults = promptData.get("maxResults", 6) - searchDepth = "basic" - timeRange = promptData.get("timeRange") - country = promptData.get("country") - language = promptData.get("language") - topic = "general" - - elif operationType == OperationTypeEnum.WEB_NEWS: - query = promptData.get("newsPrompt", promptContent) - maxResults = promptData.get("maxResults", 10) - searchDepth = "basic" - timeRange = promptData.get("timeRange", "w") # Default to week for news - country = promptData.get("country") - language = promptData.get("language") - topic = "news" - - else: - # Fallback to research settings - query = promptData.get("researchPrompt", promptContent) - maxResults = promptData.get("maxResults", 5) - searchDepth = "basic" - timeRange = promptData.get("timeRange") - country = promptData.get("country") - language = promptData.get("language") - topic = "general" - - logger.info(f"Tavily {operationType} operation: query='{query}', maxResults={maxResults}, topic={topic}") - - # Step 1: Search for relevant URLs - searchResults = await self._search( - query=query, - max_results=maxResults * 2, # Get more for better selection - search_depth=searchDepth, - time_range=timeRange, - country=country, - language=language, - topic=topic, - include_answer=True, - include_raw_content=True - ) - - if not searchResults: - return AiModelResponse( - content="No search results found", - success=False, - error="No relevant URLs found for the query" - ) - - # Step 2: AI-based URL selection - selectedResults = await self._aiBasedUrlSelection(searchResults, query, maxResults) - - if not selectedResults: - return AiModelResponse( - content="No relevant URLs selected", - success=False, - error="AI could not select relevant URLs" - ) - - # Step 3: Crawl selected URLs for content - urlsToCrawl = [result.url for result in selectedResults] - crawlResults = await self._crawl( - urls=urlsToCrawl, - extract_depth="advanced", - format="markdown" - ) - - # Step 4: Combine search and crawl results - combinedResults = [] - for searchResult in selectedResults: - # Find corresponding crawl result - crawlResult = next((cr for cr in crawlResults if cr.url == searchResult.url), None) - - combinedResult = { - "title": searchResult.title, - "url": searchResult.url, - "summary": getattr(searchResult, 'raw_content', ''), - "content": crawlResult.content if crawlResult else '', - "extractedAt": getattr(crawlResult, 'extracted_at', '') if crawlResult else '' - } - combinedResults.append(combinedResult) - - # Step 5: Format response based on operation type - if operationType == "WEB_RESEARCH": - responseData = { - "query": query, - "research_results": combinedResults, - "total_count": len(combinedResults), - "operation_type": "research" - } - elif operationType == "WEB_QUESTIONS": - responseData = { - "question": query, - "answer_sources": combinedResults, - "total_count": len(combinedResults), - "operation_type": "questions" - } - elif operationType == "WEB_NEWS": - responseData = { - "news_query": query, - "articles": combinedResults, - "total_count": len(combinedResults), - "operation_type": "news" - } - else: - responseData = { - "query": query, - "results": combinedResults, - "total_count": len(combinedResults), - "operation_type": operationType - } - - import json - content = json.dumps(responseData, indent=2) - - return AiModelResponse( - content=content, - success=True, - metadata={ - "total_count": len(combinedResults), - "urls_searched": len(searchResults), - "urls_crawled": len(crawlResults), - "operation_type": operationType - } - ) - - except Exception as e: - return AiModelResponse( - content="", - success=False, - error=str(e) - ) - - async def scrape(self, modelCall) -> "AiModelResponse": - """Scrape using standardized AiModelCall/AiModelResponse pattern""" - try: - # Extract parameters from modelCall - query = modelCall.messages[0]["content"] if modelCall.messages else "" - options = modelCall.options - - search_results = await self._search( - query=query, - max_results=getattr(options, "max_results", 5), - search_depth=getattr(options, "search_depth", None), - time_range=getattr(options, "time_range", None), - topic=getattr(options, "topic", None), - include_domains=getattr(options, "include_domains", None), - exclude_domains=getattr(options, "exclude_domains", None), - language=getattr(options, "language", None), - include_answer=getattr(options, "include_answer", None), - include_raw_content=getattr(options, "include_raw_content", None), - ) - - urls = [result.url for result in search_results] - crawl_results = await self._crawl( - urls, - extract_depth=getattr(options, "extract_depth", None), - format=getattr(options, "format", None), - ) - - # Convert to JSON string - results_json = { - "query": query, - "results": [ - { - "url": result.url, - "content": result.content - } - for result in crawl_results - ], - "total_count": len(crawl_results) - } - - import json - content = json.dumps(results_json, indent=2) - - return AiModelResponse( - content=content, - success=True, - metadata={ - "total_count": len(crawl_results), - "search_depth": getattr(options, "search_depth", "basic"), - "extract_depth": getattr(options, "extract_depth", "basic") - } - ) - - except Exception as e: - return AiModelResponse( - content="", - success=False, - error=str(e) - ) - - # Helper Functions - - async def _search_urls_raw(self, - *, - query: str, - max_results: int, - search_depth: str | None = None, - time_range: str | None = None, - topic: str | None = None, - include_domains: list[str] | None = None, - exclude_domains: list[str] | None = None, - language: str | None = None, - include_answer: bool | None = None, - include_raw_content: bool | None = None, - ) -> list["WebSearchResult"]: - return await self._search( - query=query, - max_results=max_results, - search_depth=search_depth, - time_range=time_range, - topic=topic, - include_domains=include_domains, - exclude_domains=exclude_domains, - language=language, - include_answer=include_answer, - include_raw_content=include_raw_content, - ) - - async def _crawl_urls_raw(self, - *, - urls: list[str], - extract_depth: str | None = None, - format: str | None = None, - ) -> list["WebCrawlResult"]: - return await self._crawl(urls, extract_depth=extract_depth, format=format) - - async def _scrape_raw(self, - *, - query: str, - max_results: int, - search_depth: str | None = None, - time_range: str | None = None, - topic: str | None = None, - include_domains: list[str] | None = None, - exclude_domains: list[str] | None = None, - language: str | None = None, - include_answer: bool | None = None, - include_raw_content: bool | None = None, - extract_depth: str | None = None, - format: str | None = None, - ) -> list["WebCrawlResult"]: - search_results = await self._search( - query=query, - max_results=max_results, - search_depth=search_depth, - time_range=time_range, - topic=topic, - include_domains=include_domains, - exclude_domains=exclude_domains, - language=language, - include_answer=include_answer, - include_raw_content=include_raw_content, - ) - urls = [result.url for result in search_results] - return await self._crawl(urls, extract_depth=extract_depth, format=format) - - def _clean_url(self, url: str) -> str: + def _cleanUrl(self, url: str) -> str: """Clean URL by removing extra text that might be appended.""" import re # Extract just the URL part, removing any extra text after it - url_match = re.match(r'(https?://[^\s,]+)', url) - if url_match: - return url_match.group(1) + urlMatch = re.match(r'(https?://[^\s,]+)', url) + if urlMatch: + return urlMatch.group(1) return url async def _search( self, query: str, - max_results: int, - search_depth: str | None = None, - time_range: str | None = None, + maxResults: int, + searchDepth: str | None = None, + timeRange: str | None = None, topic: str | None = None, - include_domains: list[str] | None = None, - exclude_domains: list[str] | None = None, + includeDomains: list[str] | None = None, + excludeDomains: list[str] | None = None, language: str | None = None, country: str | None = None, - include_answer: bool | None = None, - include_raw_content: bool | None = None, + includeAnswer: bool | None = None, + includeRawContent: bool | None = None, ) -> list[WebSearchResult]: """Calls the Tavily API to perform a web search.""" - # Make sure max_results is within the allowed range (use cached values) + # Make sure maxResults is within the allowed range (use cached values) minResults = self.webSearchMinResults maxAllowedResults = self.webSearchMaxResults - if max_results < minResults or max_results > maxAllowedResults: - raise ValueError(f"max_results must be between {minResults} and {maxAllowedResults}") + if maxResults < minResults or maxResults > maxAllowedResults: + raise ValueError(f"maxResults must be between {minResults} and {maxAllowedResults}") # Perform actual API call # Build kwargs only for provided options to avoid API rejections - kwargs: dict = {"query": query, "max_results": max_results} - if search_depth is not None: - kwargs["search_depth"] = search_depth - if time_range is not None: - kwargs["time_range"] = time_range + kwargs: dict = {"query": query, "max_results": maxResults} + if searchDepth is not None: + kwargs["search_depth"] = searchDepth + if timeRange is not None: + kwargs["time_range"] = timeRange if topic is not None: kwargs["topic"] = topic - if include_domains is not None and len(include_domains) > 0: - kwargs["include_domains"] = include_domains - if exclude_domains is not None: - kwargs["exclude_domains"] = exclude_domains + if includeDomains is not None and len(includeDomains) > 0: + kwargs["include_domains"] = includeDomains + if excludeDomains is not None: + kwargs["exclude_domains"] = excludeDomains if language is not None: kwargs["language"] = language if country is not None: kwargs["country"] = country - if include_answer is not None: - kwargs["include_answer"] = include_answer - if include_raw_content is not None: - kwargs["include_raw_content"] = include_raw_content + if includeAnswer is not None: + kwargs["include_answer"] = includeAnswer + if includeRawContent is not None: + kwargs["include_raw_content"] = includeRawContent logger.debug(f"Tavily.search kwargs: {kwargs}") @@ -962,8 +320,8 @@ One URL per line. return [ WebSearchResult( title=result["title"], - url=self._clean_url(result["url"]), - raw_content=result.get("raw_content") + url=self._cleanUrl(result["url"]), + rawContent=result.get("raw_content") ) for result in response["results"] ] @@ -971,7 +329,7 @@ One URL per line. async def _crawl( self, urls: list, - extract_depth: str | None = None, + extractDepth: str | None = None, format: str | None = None, ) -> list[WebCrawlResult]: """Calls the Tavily API to extract text content from URLs with retry logic.""" @@ -980,19 +338,19 @@ One URL per line. timeout = self.crawlTimeout logger.debug(f"Starting crawl of {len(urls)} URLs: {urls}") - logger.debug(f"Crawl settings: extract_depth={extract_depth}, format={format}, timeout={timeout}s") - + logger.debug(f"Crawl settings: extractDepth={extractDepth}, format={format}, timeout={timeout}s") + for attempt in range(maxRetries + 1): try: logger.debug(f"Crawl attempt {attempt + 1}/{maxRetries + 1}") # Use asyncio.wait_for for timeout # Build kwargs for extract - kwargs_extract: dict = {"urls": urls} - kwargs_extract["extract_depth"] = extract_depth or "advanced" - kwargs_extract["format"] = format or "markdown" # Use markdown to get HTML structure + kwargsExtract: dict = {"urls": urls} + kwargsExtract["extract_depth"] = extractDepth or "advanced" + kwargsExtract["format"] = format or "markdown" # Use markdown to get HTML structure - logger.debug(f"Sending request to Tavily with kwargs: {kwargs_extract}") + logger.debug(f"Sending request to Tavily with kwargs: {kwargsExtract}") # Ensure client is initialized if self.client is None: @@ -1001,7 +359,7 @@ One URL per line. raise ValueError("Tavily client not initialized. Please check API key configuration.") response = await asyncio.wait_for( - self.client.extract(**kwargs_extract), + self.client.extract(**kwargsExtract), timeout=timeout ) @@ -1022,7 +380,8 @@ One URL per line. results = [ WebCrawlResult( url=result["url"], - content=result.get("raw_content", result.get("content", "")) # Try raw_content first, fallback to content + content=result.get("raw_content", result.get("content", "")), # Try raw_content first, fallback to content + title=result.get("title", "") # Extract title if available ) for result in response["results"] ] @@ -1060,262 +419,137 @@ One URL per line. await asyncio.sleep(retryDelay) else: raise Exception(f"Crawl failed after {maxRetries + 1} attempts: {str(e)}") - - async def comprehensiveWebResearch(self, request: WebResearchRequest) -> WebResearchResult: + + async def _routeWebOperation(self, modelCall: AiModelCall) -> "AiModelResponse": """ - Perform comprehensive web research using Tavily's search and extract capabilities. - This method orchestrates the full web research workflow. - """ - try: - logger.info(f"COMPREHENSIVE WEB RESEARCH STARTED") - logger.info(f"User Query: {request.user_prompt}") - logger.info(f"Max Results: {request.max_results}, Max Pages: {request.max_pages}") - - # Global URL index to track all processed URLs across the entire research session - global_processed_urls = set() - - # Step 1: Find relevant websites - either provided URLs or AI-determined main URLs - logger.info(f"=== STEP 1: INITIAL MAIN URLS LIST ===") - - if request.urls: - # Use provided URLs as initial main URLs - websites = request.urls - logger.info(f"Using provided URLs ({len(websites)}):") - for i, url in enumerate(websites, 1): - logger.info(f" {i}. {url}") - else: - # Use AI to determine main URLs based on user's intention - logger.info(f"AI analyzing user intent: '{request.user_prompt}'") - - # Use basic search parameters - search_query = request.user_prompt - search_depth = request.search_depth or "basic" - time_range = request.time_range - topic = request.topic - country = request.country - language = request.language - max_results = request.max_results - - logger.info(f"Using search parameters: query='{search_query}', depth={search_depth}, time_range={time_range}, topic={topic}") - - # Perform web search - search_results = await self._search( - query=search_query, - max_results=max_results, - search_depth=search_depth, - time_range=time_range, - topic=topic, - country=country, - language=language, - include_answer=True, - include_raw_content=True - ) - - # Extract URLs from search results - websites = [result.url for result in search_results] - logger.info(f"Found {len(websites)} URLs from search") - - # AI-based URL selection and deduplication - if len(websites) > request.max_pages: - logger.info(f"AI selecting most relevant {request.max_pages} URLs from {len(websites)} found") - - # For now, just take the first max_pages URLs - selected_indices = list(range(min(request.max_pages, len(websites)))) - selected_websites = [websites[i] for i in selected_indices] - - # Remove duplicates while preserving order - seen = set() - unique_websites = [] - for url in selected_websites: - if url not in seen: - seen.add(url) - unique_websites.append(url) - - websites = unique_websites - - logger.info(f"After AI selection deduplication: {len(websites)} unique URLs") - logger.info(f"AI selected {len(websites)} main URLs (after deduplication):") - for i, url in enumerate(websites, 1): - logger.info(f" {i}. {url}") - - # Step 2: Smart website selection using AI interface - logger.info(f"=== STEP 2: FILTERED URL LIST BY USER PROMPT'S INTENTION ===") - logger.info(f"AI analyzing {len(websites)} URLs for relevance to: '{request.user_prompt}'") - - # For now, just use all websites - selected_websites = websites - - logger.debug(f"AI selected {len(selected_websites)} most relevant URLs:") - for i, url in enumerate(selected_websites, 1): - logger.debug(f" {i}. {url}") - - # Step 3+4+5: Recursive crawling with configurable depth - # Get configuration parameters - max_depth = int(APP_CONFIG.get("Web_Research_MAX_DEPTH", "2")) - max_links_per_domain = int(APP_CONFIG.get("Web_Research_MAX_LINKS_PER_DOMAIN", "4")) - crawl_timeout_minutes = int(APP_CONFIG.get("Web_Research_CRAWL_TIMEOUT_MINUTES", "10")) - - # Use the configured max_depth or the request's search_depth, whichever is smaller - effective_depth = min(max_depth, request.search_depth if isinstance(request.search_depth, int) else 2) - - logger.info(f"=== STEP 3+4+5: RECURSIVE CRAWLING ===") - logger.info(f"Starting recursive crawl with depth {effective_depth}") - logger.info(f"Max links per domain: {max_links_per_domain}") - logger.info(f"Crawl timeout: {crawl_timeout_minutes} minutes") - - # Perform recursive crawling - all_content = await self._crawlRecursively( - urls=selected_websites, - max_depth=effective_depth, - extract_depth=request.extract_depth, - max_per_domain=max_links_per_domain, - global_processed_urls=global_processed_urls - ) - - logger.info(f"Recursive crawl completed: {len(all_content)} total pages crawled") - - # Step 6: AI analysis of all collected content - logger.info(f"=== STEP 6: AI ANALYSIS ===") - logger.info(f"Analyzing {len(all_content)} websites with AI") - - # Create a basic analysis result - analysis_result = f"Web research completed for: {request.user_prompt}\n\n" - analysis_result += f"Analyzed {len(all_content)} websites:\n" - for url, content in all_content.items(): - analysis_result += f"- {url}: {len(content)} characters\n" - - # Create result documents - import time - result_documents = [] - - # Main research result - main_document = { - "documentName": f"web_research_{int(time.time())}.json", - "documentData": { - "user_prompt": request.user_prompt, - "websites_analyzed": len(all_content), - "additional_links_found": 0, # Would be calculated from crawl results - "analysis_result": analysis_result, - "sources": [{"title": f"Website {i+1}", "url": url} for i, url in enumerate(all_content.keys())], - "additional_links": [], - "debug_info": { - "total_urls_processed": len(global_processed_urls), - "crawl_depth": effective_depth, - "extract_depth": request.extract_depth - } - }, - "mimeType": "application/json" - } - result_documents.append(main_document) - - # Individual website content documents - for i, (url, content) in enumerate(all_content.items()): - content_document = { - "documentName": f"website_content_{i+1}.md", - "documentData": content, - "mimeType": "text/markdown" - } - result_documents.append(content_document) - - logger.info(f"WEB RESEARCH COMPLETED SUCCESSFULLY") - logger.info(f"Generated {len(result_documents)} result documents") - - return WebResearchResult( - success=True, - documents=result_documents - ) - - except Exception as e: - logger.error(f"Error in comprehensive web research: {str(e)}") - return WebResearchResult( - success=False, - error=str(e), - documents=[] - ) - - async def _crawlRecursively(self, urls: List[str], max_depth: int, extract_depth: str = "advanced", max_per_domain: int = 10, global_processed_urls: Optional[set] = None) -> Dict[str, str]: - """ - Recursively crawl URLs up to specified depth. - This is a simplified version of the recursive crawling logic. - """ - logger.info(f"Starting recursive crawl: {len(urls)} starting URLs, max_depth={max_depth}") + Route web operation based on operation type. - # URL index to track all processed URLs (local + global) - processed_urls = set() - if global_processed_urls is not None: - processed_urls = global_processed_urls - logger.info(f"Using global URL index with {len(processed_urls)} already processed URLs") + Args: + modelCall: AiModelCall with messages and options + + Returns: + AiModelResponse based on operation type + """ + operationType = modelCall.options.operationType + + if operationType == OperationTypeEnum.WEB_SEARCH: + return await self.webSearch(modelCall) + elif operationType == OperationTypeEnum.WEB_CRAWL: + return await self.webCrawl(modelCall) else: - logger.info("Using local URL index for this crawl session") + # Unsupported operation type + return AiModelResponse( + content="", + success=False, + error=f"Unsupported operation type: {operationType}" + ) + + async def webSearch(self, modelCall: AiModelCall) -> "AiModelResponse": + """ + WEB_SEARCH operation - returns list of URLs using Tavily search. - all_content = {} - current_level_urls = urls.copy() - - try: - for depth in range(1, max_depth + 1): - logger.info(f"=== DEPTH LEVEL {depth}/{max_depth} ===") - logger.info(f"Processing {len(current_level_urls)} URLs at depth {depth}") - - # URLs found at this level (for next iteration) - next_level_urls = [] - - for url in current_level_urls: - # Normalize URL for duplicate checking - normalized_url = self._normalizeUrl(url) - if normalized_url in processed_urls: - logger.debug(f"URL {url} (normalized: {normalized_url}) already processed, skipping") - continue - - try: - logger.info(f"Processing URL at depth {depth}: {url}") - - # Extract content from URL - crawl_results = await self._crawl([url], extract_depth=extract_depth, format="markdown") - - if crawl_results and crawl_results[0].content: - content = crawl_results[0].content - all_content[url] = content - processed_urls.add(normalized_url) - logger.info(f"✓ Successfully processed {url}: {len(content)} chars") - - # For simplicity, we'll skip finding sub-links in this implementation - # In a full implementation, you would extract links and add them to next_level_urls - - else: - logger.warning(f"✗ No content extracted from {url}") - processed_urls.add(normalized_url) - - except Exception as e: - logger.warning(f"✗ Failed to process URL {url} at depth {depth}: {e}") - processed_urls.add(normalized_url) - - # Prepare for next iteration - current_level_urls = next_level_urls - logger.info(f"Depth {depth} completed. Found {len(next_level_urls)} URLs for next level") - - # Stop if no more URLs to process - if not current_level_urls: - logger.info(f"No more URLs found at depth {depth}, stopping recursion") - break + Args: + modelCall: AiModelCall with AiCallPromptWebSearch as prompt - logger.info(f"Recursive crawl completed: {len(all_content)} total pages crawled") - return all_content + Returns: + AiModelResponse with JSON list of URLs + """ + try: + # Extract parameters + promptContent = modelCall.messages[0]["content"] if modelCall.messages else "" + import json + promptData = json.loads(promptContent) + + # Create Pydantic model + webSearchPrompt = AiCallPromptWebSearch(**promptData) + + # Convert ISO country code to country name for Tavily + countryName = webSearchPrompt.country + if countryName: + countryName = self._convertIsoCodeToCountryName(countryName) + + # Perform search + searchResults = await self._search( + query=webSearchPrompt.instruction, + maxResults=webSearchPrompt.maxNumberPages, + timeRange=webSearchPrompt.timeRange, + country=countryName, + language=webSearchPrompt.language, + includeAnswer=False, + includeRawContent=False + ) + + # Extract URLs from results + urls = [result.url for result in searchResults] + + # Return as JSON array + import json + return AiModelResponse( + content=json.dumps(urls, indent=2), + success=True, + metadata={"total_urls": len(urls), "operation": "WEB_SEARCH"} + ) except Exception as e: - logger.error(f"Crawling failed with error: {e}, returning partial results: {len(all_content)} pages crawled so far") - return all_content - - def _normalizeUrl(self, url: str) -> str: - """Normalize URL to handle variations that should be considered duplicates.""" - if not url: - return url + logger.error(f"Error in Tavily web search: {str(e)}") + return AiModelResponse( + content="[]", + success=False, + error=str(e) + ) + + async def webCrawl(self, modelCall: AiModelCall) -> "AiModelResponse": + """ + WEB_CRAWL operation - crawls one URL using Tavily. - # Remove trailing slashes and fragments - url = url.rstrip('/') - if '#' in url: - url = url.split('#')[0] - - # Handle common URL variations - url = url.replace('http://', 'https://') # Normalize protocol - - return url + Args: + modelCall: AiModelCall with AiCallPromptWebCrawl as prompt + + Returns: + AiModelResponse with crawl results as JSON + """ + try: + # Extract parameters + promptContent = modelCall.messages[0]["content"] if modelCall.messages else "" + import json + promptData = json.loads(promptContent) + + # Create Pydantic model + webCrawlPrompt = AiCallPromptWebCrawl(**promptData) + + # Perform crawl for ONE URL + # Note: _crawl expects a list, so we wrap the single URL in a list + crawlResults = await self._crawl( + urls=[webCrawlPrompt.url], + extractDepth="advanced" if webCrawlPrompt.maxDepth > 2 else "basic", + format="markdown" + ) + + # Format result for single URL - consistent with Perplexity format + if crawlResults and len(crawlResults) > 0: + firstResult = crawlResults[0] + resultData = { + "url": firstResult.url, + "title": firstResult.title if firstResult.title else "Content", + "content": firstResult.content + } + else: + resultData = {"url": webCrawlPrompt.url, "title": "", "content": "", "error": "No content extracted"} + + # Return as JSON - same format as Perplexity + import json + return AiModelResponse( + content=json.dumps(resultData, indent=2), + success=True, + metadata={"operation": "WEB_CRAWL", "url": webCrawlPrompt.url} + ) + + except Exception as e: + logger.error(f"Error in Tavily web crawl: {str(e)}") + import json + errorResult = {"error": str(e), "url": ""} + return AiModelResponse( + content=json.dumps(errorResult, indent=2), + success=False, + error=str(e) + ) diff --git a/modules/datamodels/datamodelAi.py b/modules/datamodels/datamodelAi.py index 31387e28..b7e883d1 100644 --- a/modules/datamodels/datamodelAi.py +++ b/modules/datamodels/datamodelAi.py @@ -21,11 +21,8 @@ class OperationTypeEnum(str, Enum): IMAGE_GENERATE = "imageGenerate" # Web Operations - WEB_SEARCH = "webSearch" # Returns list of URLs only - WEB_CRAWL = "webCrawl" # Returns content from given URLs - WEB_RESEARCH = "webResearch" # WEB_SEARCH + WEB_CRAWL combined (scrape function) - WEB_QUESTIONS = "webQuestions" # Question-answering web research - WEB_NEWS = "webNews" # News-specific web research + WEB_SEARCH = "webSearch" # Returns list of URLs only + WEB_CRAWL = "webCrawl" # Web crawl for a given URL # Operation Type Rating - Helper class for capability ratings @@ -49,8 +46,8 @@ def createOperationTypeRatings(*ratings: Tuple[OperationTypeEnum, int]) -> List[ Usage: operationTypes = createOperationTypeRatings( (OperationTypeEnum.DATA_ANALYSE, 8), - (OperationTypeEnum.WEB_RESEARCH, 10), - (OperationTypeEnum.WEB_NEWS, 7) + (OperationTypeEnum.WEB_SEARCH, 10), + (OperationTypeEnum.WEB_CRAWL, 9) ) """ return [OperationTypeRating(operationType=ot, rating=rating) for ot, rating in ratings] @@ -195,3 +192,42 @@ class AiModelResponse(BaseModel): class Config: arbitraryTypesAllowed = True + +# Structured prompt models for specialized operations +class AiCallPromptWebSearch(BaseModel): + """Structured prompt format for WEB_SEARCH operation - returns list of URLs.""" + + instruction: str = Field(description="Search instruction/query for finding relevant URLs") + country: Optional[str] = Field(default=None, description="Two-digit country code (lowercase, e.g., ch, us, de, fr)") + maxNumberPages: Optional[int] = Field(default=10, description="Maximum number of pages to search (default: 10)") + timeRange: Optional[str] = Field(default=None, description="Time range filter (d, w, m, y)") + language: Optional[str] = Field(default=None, description="Language code (lowercase, e.g., de, en, fr)") + researchDepth: Optional[str] = Field(default="general", description="Research depth: fast (maxDepth=1), general (maxDepth=2), deep (maxDepth=3)") + + class Config: + pass + + +class AiCallPromptWebCrawl(BaseModel): + """Structured prompt format for WEB_CRAWL operation - crawls ONE specific URL and returns content.""" + + instruction: str = Field(description="Instruction for what content to extract from URL") + url: str = Field(description="Single URL to crawl") + maxDepth: Optional[int] = Field(default=2, description="Maximum number of hops from starting page (default: 2)") + maxWidth: Optional[int] = Field(default=10, description="Maximum pages to crawl per level (default: 10)") + + class Config: + pass + + +class AiCallPromptImage(BaseModel): + """Structured prompt format for image generation.""" + + prompt: str = Field(description="Text description of the image to generate") + size: Optional[str] = Field(default="1024x1024", description="Image size (1024x1024, 1792x1024, 1024x1792)") + quality: Optional[str] = Field(default="standard", description="Image quality (standard, hd)") + style: Optional[str] = Field(default="vivid", description="Image style (vivid, natural)") + + class Config: + pass + diff --git a/modules/datamodels/datamodelTools.py b/modules/datamodels/datamodelTools.py new file mode 100644 index 00000000..39ba8bda --- /dev/null +++ b/modules/datamodels/datamodelTools.py @@ -0,0 +1,225 @@ +""" +Utility data models and classes for common tools and mappings. +""" + +class CountryCodes: + """ + Centralized country code mapping for different services. + + Maps ISO-2 country codes to service-specific country names. + Each service may have different requirements for country names. + """ + + # Mapping: ISO-2 code -> (Tavily country name, Perplexity country name) + _COUNTRY_MAP = { + "AF": ("afghanistan", "Afghanistan"), + "AL": ("albania", "Albania"), + "DZ": ("algeria", "Algeria"), + "AD": ("andorra", "Andorra"), + "AO": ("angola", "Angola"), + "AR": ("argentina", "Argentina"), + "AM": ("armenia", "Armenia"), + "AU": ("australia", "Australia"), + "AT": ("austria", "Austria"), + "AZ": ("azerbaijan", "Azerbaijan"), + "BS": ("bahamas", "Bahamas"), + "BH": ("bahrain", "Bahrain"), + "BD": ("bangladesh", "Bangladesh"), + "BB": ("barbados", "Barbados"), + "BY": ("belarus", "Belarus"), + "BE": ("belgium", "Belgium"), + "BZ": ("belize", "Belize"), + "BJ": ("benin", "Benin"), + "BT": ("bhutan", "Bhutan"), + "BO": ("bolivia", "Bolivia"), + "BA": ("bosnia and herzegovina", "Bosnia and Herzegovina"), + "BW": ("botswana", "Botswana"), + "BR": ("brazil", "Brazil"), + "BN": ("brunei", "Brunei"), + "BG": ("bulgaria", "Bulgaria"), + "BF": ("burkina faso", "Burkina Faso"), + "BI": ("burundi", "Burundi"), + "KH": ("cambodia", "Cambodia"), + "CM": ("cameroon", "Cameroon"), + "CA": ("canada", "Canada"), + "CV": ("cape verde", "Cape Verde"), + "CF": ("central african republic", "Central African Republic"), + "TD": ("chad", "Chad"), + "CL": ("chile", "Chile"), + "CN": ("china", "China"), + "CO": ("colombia", "Colombia"), + "KM": ("comoros", "Comoros"), + "CG": ("congo", "Congo"), + "CR": ("costa rica", "Costa Rica"), + "HR": ("croatia", "Croatia"), + "CU": ("cuba", "Cuba"), + "CY": ("cyprus", "Cyprus"), + "CZ": ("czech republic", "Czech Republic"), + "DK": ("denmark", "Denmark"), + "DJ": ("djibouti", "Djibouti"), + "DO": ("dominican republic", "Dominican Republic"), + "EC": ("ecuador", "Ecuador"), + "EG": ("egypt", "Egypt"), + "SV": ("el salvador", "El Salvador"), + "GQ": ("equatorial guinea", "Equatorial Guinea"), + "ER": ("eritrea", "Eritrea"), + "EE": ("estonia", "Estonia"), + "ET": ("ethiopia", "Ethiopia"), + "FJ": ("fiji", "Fiji"), + "FI": ("finland", "Finland"), + "FR": ("france", "France"), + "GA": ("gabon", "Gabon"), + "GM": ("gambia", "Gambia"), + "GE": ("georgia", "Georgia"), + "DE": ("germany", "Germany"), + "GH": ("ghana", "Ghana"), + "GR": ("greece", "Greece"), + "GT": ("guatemala", "Guatemala"), + "GN": ("guinea", "Guinea"), + "HT": ("haiti", "Haiti"), + "HN": ("honduras", "Honduras"), + "HU": ("hungary", "Hungary"), + "IS": ("iceland", "Iceland"), + "IN": ("india", "India"), + "ID": ("indonesia", "Indonesia"), + "IR": ("iran", "Iran"), + "IQ": ("iraq", "Iraq"), + "IE": ("ireland", "Ireland"), + "IL": ("israel", "Israel"), + "IT": ("italy", "Italy"), + "JM": ("jamaica", "Jamaica"), + "JP": ("japan", "Japan"), + "JO": ("jordan", "Jordan"), + "KZ": ("kazakhstan", "Kazakhstan"), + "KE": ("kenya", "Kenya"), + "KW": ("kuwait", "Kuwait"), + "KG": ("kyrgyzstan", "Kyrgyzstan"), + "LV": ("latvia", "Latvia"), + "LB": ("lebanon", "Lebanon"), + "LS": ("lesotho", "Lesotho"), + "LR": ("liberia", "Liberia"), + "LY": ("libya", "Libya"), + "LI": ("liechtenstein", "Liechtenstein"), + "LT": ("lithuania", "Lithuania"), + "LU": ("luxembourg", "Luxembourg"), + "MG": ("madagascar", "Madagascar"), + "MW": ("malawi", "Malawi"), + "MY": ("malaysia", "Malaysia"), + "MV": ("maldives", "Maldives"), + "ML": ("mali", "Mali"), + "MT": ("malta", "Malta"), + "MR": ("mauritania", "Mauritania"), + "MU": ("mauritius", "Mauritius"), + "MX": ("mexico", "Mexico"), + "MD": ("moldova", "Moldova"), + "MC": ("monaco", "Monaco"), + "MN": ("mongolia", "Mongolia"), + "ME": ("montenegro", "Montenegro"), + "MA": ("morocco", "Morocco"), + "MZ": ("mozambique", "Mozambique"), + "MM": ("myanmar", "Myanmar"), + "NA": ("namibia", "Namibia"), + "NP": ("nepal", "Nepal"), + "NL": ("netherlands", "Netherlands"), + "NZ": ("new zealand", "New Zealand"), + "NI": ("nicaragua", "Nicaragua"), + "NE": ("niger", "Niger"), + "NG": ("nigeria", "Nigeria"), + "KP": ("north korea", "North Korea"), + "MK": ("north macedonia", "North Macedonia"), + "NO": ("norway", "Norway"), + "OM": ("oman", "Oman"), + "PK": ("pakistan", "Pakistan"), + "PA": ("panama", "Panama"), + "PG": ("papua new guinea", "Papua New Guinea"), + "PY": ("paraguay", "Paraguay"), + "PE": ("peru", "Peru"), + "PH": ("philippines", "Philippines"), + "PL": ("poland", "Poland"), + "PT": ("portugal", "Portugal"), + "QA": ("qatar", "Qatar"), + "RO": ("romania", "Romania"), + "RU": ("russia", "Russia"), + "RW": ("rwanda", "Rwanda"), + "SA": ("saudi arabia", "Saudi Arabia"), + "SN": ("senegal", "Senegal"), + "RS": ("serbia", "Serbia"), + "SG": ("singapore", "Singapore"), + "SK": ("slovakia", "Slovakia"), + "SI": ("slovenia", "Slovenia"), + "SO": ("somalia", "Somalia"), + "ZA": ("south africa", "South Africa"), + "KR": ("south korea", "South Korea"), + "SS": ("south sudan", "South Sudan"), + "ES": ("spain", "Spain"), + "LK": ("sri lanka", "Sri Lanka"), + "SD": ("sudan", "Sudan"), + "SE": ("sweden", "Sweden"), + "CH": ("switzerland", "Switzerland"), + "SY": ("syria", "Syria"), + "TW": ("taiwan", "Taiwan"), + "TJ": ("tajikistan", "Tajikistan"), + "TZ": ("tanzania", "Tanzania"), + "TH": ("thailand", "Thailand"), + "TG": ("togo", "Togo"), + "TT": ("trinidad and tobago", "Trinidad and Tobago"), + "TN": ("tunisia", "Tunisia"), + "TR": ("turkey", "Turkey"), + "TM": ("turkmenistan", "Turkmenistan"), + "UG": ("uganda", "Uganda"), + "UA": ("ukraine", "Ukraine"), + "AE": ("united arab emirates", "United Arab Emirates"), + "GB": ("united kingdom", "United Kingdom"), + "US": ("united states", "United States"), + "UY": ("uruguay", "Uruguay"), + "UZ": ("uzbekistan", "Uzbekistan"), + "VE": ("venezuela", "Venezuela"), + "VN": ("vietnam", "Vietnam"), + "YE": ("yemen", "Yemen"), + "ZM": ("zambia", "Zambia"), + "ZW": ("zimbabwe", "Zimbabwe"), + } + + @classmethod + def getForTavily(cls, isoCode: str) -> str: + """ + Get Tavily-compatible country name from ISO-2 code. + + Args: + isoCode: ISO-2 country code (e.g., "CH", "US") + + Returns: + Country name in lowercase as required by Tavily (e.g., "switzerland", "united states") + """ + isoCodeUpper = isoCode.upper() + mapping = cls._COUNTRY_MAP.get(isoCodeUpper) + return mapping[0] if mapping else isoCode + + @classmethod + def getForPerplexity(cls, isoCode: str) -> str: + """ + Get Perplexity-compatible country name from ISO-2 code. + + Args: + isoCode: ISO-2 country code (e.g., "CH", "US") + + Returns: + Full country name as required by Perplexity (e.g., "Switzerland", "United States") + """ + isoCodeUpper = isoCode.upper() + mapping = cls._COUNTRY_MAP.get(isoCodeUpper) + return mapping[1] if mapping else isoCode + + @classmethod + def isValid(cls, isoCode: str) -> bool: + """ + Check if ISO-2 code is valid. + + Args: + isoCode: ISO-2 country code to check + + Returns: + True if valid, False otherwise + """ + return isoCode.upper() in cls._COUNTRY_MAP + diff --git a/modules/services/__init__.py b/modules/services/__init__.py index 384ae6af..0f269e28 100644 --- a/modules/services/__init__.py +++ b/modules/services/__init__.py @@ -81,6 +81,9 @@ class Services: from .serviceUtils.mainServiceUtils import UtilsService self.utils = PublicService(UtilsService(self)) + from .serviceWeb.mainServiceWeb import WebService + self.web = PublicService(WebService(self)) + def getInterface(user: User, workflow: ChatWorkflow) -> Services: return Services(user, workflow) diff --git a/modules/services/serviceAi/mainServiceAi.py b/modules/services/serviceAi/mainServiceAi.py index 6e199678..2b876de3 100644 --- a/modules/services/serviceAi/mainServiceAi.py +++ b/modules/services/serviceAi/mainServiceAi.py @@ -3,7 +3,6 @@ from typing import Dict, Any, List, Optional, Union from modules.datamodels.datamodelChat import PromptPlaceholder, ChatDocument from modules.services.serviceExtraction.mainServiceExtraction import ExtractionService from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, PriorityEnum -from modules.aicore.aicorePluginTavily import WebResearchRequest, WebResearchResult from modules.interfaces.interfaceAiObjects import AiObjects from modules.services.serviceAi.subCoreAi import SubCoreAi from modules.services.serviceAi.subDocumentProcessing import SubDocumentProcessing diff --git a/modules/services/serviceWeb/mainServiceWeb.py b/modules/services/serviceWeb/mainServiceWeb.py new file mode 100644 index 00000000..fc08aa7c --- /dev/null +++ b/modules/services/serviceWeb/mainServiceWeb.py @@ -0,0 +1,314 @@ +""" +Web crawl service for handling web research operations. +Manages the two-step process: WEB_SEARCH then WEB_CRAWL. +""" + +import json +import logging +from typing import Dict, Any, List, Optional +from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptWebSearch, AiCallPromptWebCrawl + +logger = logging.getLogger(__name__) + + +class WebcrawlService: + """Service for web search and crawling operations.""" + + def __init__(self, services): + """Initialize webcrawl service with service center access.""" + self.services = services + + async def performWebResearch( + self, + prompt: str, + urls: List[str], + country: Optional[str], + language: Optional[str], + researchDepth: str = "general", + operationId: str = None + ) -> Dict[str, Any]: + """ + Perform web research in two steps: + 1. Use AI to analyze prompt and extract parameters + URLs + 2. Call WEB_SEARCH to get URLs (if needed) + 3. Combine URLs and filter to maxNumberPages + 4. Call WEB_CRAWL for each URL + 5. Return consolidated result + + Args: + prompt: Natural language research prompt + urls: Optional list of URLs provided by user + country: Optional country code + language: Optional language code + operationId: Operation ID for progress tracking + + Returns: + Consolidated research results as dictionary + """ + try: + # Step 1: AI intention analysis - extract URLs and parameters from prompt + self.services.workflow.progressLogUpdate(operationId, 0.1, "Analyzing research intent") + + analysisResult = await self._analyzeResearchIntent(prompt, urls, country, language, researchDepth) + + # Extract parameters from AI analysis + instruction = analysisResult.get("instruction", prompt) + extractedUrls = analysisResult.get("urls", []) + needsSearch = analysisResult.get("needsSearch", True) # Default to True + maxNumberPages = analysisResult.get("maxNumberPages", 10) + timeRange = analysisResult.get("timeRange") + countryCode = analysisResult.get("country", country) + languageCode = analysisResult.get("language", language) + finalResearchDepth = analysisResult.get("researchDepth", researchDepth) + + logger.info(f"AI Analysis: instruction='{instruction[:100]}...', urls={len(extractedUrls)}, needsSearch={needsSearch}, maxNumberPages={maxNumberPages}, researchDepth={finalResearchDepth}") + + # Combine URLs (from user + from prompt extraction) + allUrls = [] + if urls: + allUrls.extend(urls) + if extractedUrls: + allUrls.extend(extractedUrls) + + # Step 2: Search for URLs if needed (based on needsSearch flag) + if needsSearch and (not allUrls or len(allUrls) < maxNumberPages): + self.services.workflow.progressLogUpdate(operationId, 0.3, "Searching for URLs") + + searchUrls = await self._performWebSearch( + instruction=instruction, + maxNumberPages=maxNumberPages - len(allUrls), + timeRange=timeRange, + country=countryCode, + language=languageCode + ) + + # Add search URLs to the list + allUrls.extend(searchUrls) + + self.services.workflow.progressLogUpdate(operationId, 0.5, f"Found {len(allUrls)} total URLs") + + # Step 3: Filter to maxNumberPages (simple cut, no intelligent filtering) + if len(allUrls) > maxNumberPages: + allUrls = allUrls[:maxNumberPages] + logger.info(f"Limited URLs to {maxNumberPages}") + + if not allUrls: + return {"error": "No URLs found to crawl"} + + # Step 4: Translate researchDepth to maxDepth + depthMap = {"fast": 1, "general": 2, "deep": 3} + maxDepth = depthMap.get(finalResearchDepth.lower(), 2) + + # Step 5: Crawl all URLs + self.services.workflow.progressLogUpdate(operationId, 0.6, f"Crawling {len(allUrls)} URLs") + + crawlResult = await self._performWebCrawl( + instruction=instruction, + urls=allUrls, + maxDepth=maxDepth + ) + + self.services.workflow.progressLogUpdate(operationId, 0.9, "Consolidating results") + + # Return consolidated result + return { + "instruction": instruction, + "urls_crawled": allUrls, + "total_urls": len(allUrls), + "results": crawlResult, + "total_results": len(crawlResult) if isinstance(crawlResult, list) else 1 + } + + except Exception as e: + logger.error(f"Error in web research: {str(e)}") + raise + + async def _analyzeResearchIntent( + self, + prompt: str, + urls: List[str], + country: Optional[str], + language: Optional[str], + researchDepth: str = "general" + ) -> Dict[str, Any]: + """ + Use AI to analyze prompt and extract: + - URLs from the prompt text + - Research instruction + - maxNumberPages, timeRange, country, language from context + """ + # Build analysis prompt for AI + analysisPrompt = f"""Analyze this web research request and extract structured information. + +RESEARCH REQUEST: +{prompt} + +USER PROVIDED: +- URLs: {json.dumps(urls) if urls else "None"} +- Country: {country or "Not specified"} +- Language: {language or "Not specified"} + +Extract and provide a JSON response with: +1. instruction: The core research instruction (cleaned prompt without URLs) +2. urls: List of URLs found in the prompt text +3. needsSearch: true if web search is needed to identify url's to crawl, false if only crawling of provided URLs is wanted +4. maxNumberPages: Recommended number of URLs to crawl (based on research scope, typical: 2-20) +5. timeRange: Time range if mentioned (d, w, m, y, or null) +6. country: Country code if specified (2-digit lowercase, e.g., ch, us, de) +7. language: Language code if specified (lowercase, e.g., de, en, fr) +8. researchDepth: Research depth based on instruction complexity - "fast" (quick overview, maxDepth=1), "general" (standard research, maxDepth=2), or "deep" (comprehensive research, maxDepth=3) + +Return ONLY valid JSON, no additional text: +{{ + "instruction": "cleaned research instruction", + "urls": ["url1", "url2"], + "needsSearch": true, + "maxNumberPages": 10, + "timeRange": null, + "country": "ch", + "language": "de", + "researchDepth": "general" +}}""" + + try: + # Call AI planning to analyze intent + analysisJson = await self.services.ai.callAiPlanning(analysisPrompt) + + # Parse JSON response + result = json.loads(analysisJson) + + logger.info(f"Intent analysis result: {result}") + return result + + except Exception as e: + logger.warning(f"Error in AI intent analysis: {str(e)}") + # Fallback to basic extraction + return { + "instruction": prompt, + "urls": [], + "needsSearch": True, + "maxNumberPages": 10, + "timeRange": None, + "country": country, + "language": language, + "researchDepth": researchDepth + } + + async def _performWebSearch( + self, + instruction: str, + maxNumberPages: int, + timeRange: Optional[str], + country: Optional[str], + language: Optional[str] + ) -> List[str]: + """Perform web search to find URLs.""" + try: + # Build search prompt model + searchPromptModel = AiCallPromptWebSearch( + instruction=instruction, + country=country, + maxNumberPages=maxNumberPages, + timeRange=timeRange, + language=language + ) + searchPrompt = searchPromptModel.model_dump_json(exclude_none=True, indent=2) + + # Call AI with WEB_SEARCH operation + searchOptions = AiCallOptions( + operationType=OperationTypeEnum.WEB_SEARCH, + resultFormat="json" + ) + + searchResult = await self.services.ai.callAiDocuments( + prompt=searchPrompt, + documents=None, + options=searchOptions, + outputFormat="json" + ) + + # Parse and extract URLs + if isinstance(searchResult, str): + searchData = json.loads(searchResult) + else: + searchData = searchResult + + # Extract URLs from response + urls = [] + if isinstance(searchData, dict): + if "urls" in searchData: + urls = searchData["urls"] + elif "results" in searchData: + urls = [r.get("url") for r in searchData["results"] if r.get("url")] + elif isinstance(searchData, list): + urls = [item.get("url") for item in searchData if item.get("url")] + + logger.info(f"Web search returned {len(urls)} URLs") + return urls + + except Exception as e: + logger.error(f"Error in web search: {str(e)}") + return [] + + async def _performWebCrawl( + self, + instruction: str, + urls: List[str], + maxDepth: int = 2 + ) -> List[Dict[str, Any]]: + """Perform web crawl on list of URLs - calls plugin for each URL individually.""" + crawlResults = [] + + # Loop over each URL and crawl one at a time + for url in urls: + try: + logger.info(f"Crawling URL: {url}") + + # Build crawl prompt model for single URL + crawlPromptModel = AiCallPromptWebCrawl( + instruction=instruction, + url=url, # Single URL + maxDepth=maxDepth, + maxWidth=10 + ) + crawlPrompt = crawlPromptModel.model_dump_json(exclude_none=True, indent=2) + + # Call AI with WEB_CRAWL operation + crawlOptions = AiCallOptions( + operationType=OperationTypeEnum.WEB_CRAWL, + resultFormat="json" + ) + + crawlResult = await self.services.ai.callAiDocuments( + prompt=crawlPrompt, + documents=None, + options=crawlOptions, + outputFormat="json" + ) + + # Parse crawl result + if isinstance(crawlResult, str): + try: + crawlData = json.loads(crawlResult) + except: + crawlData = {"url": url, "content": crawlResult} + else: + crawlData = crawlResult + + # Ensure it's a list of results + if isinstance(crawlData, list): + crawlResults.extend(crawlData) + elif isinstance(crawlData, dict): + if "results" in crawlData: + crawlResults.extend(crawlData["results"]) + else: + crawlResults.append(crawlData) + else: + crawlResults.append({"url": url, "content": str(crawlData)}) + + except Exception as e: + logger.error(f"Error crawling URL {url}: {str(e)}") + crawlResults.append({"url": url, "error": str(e)}) + + return crawlResults + diff --git a/modules/workflows/methods/methodAi.py b/modules/workflows/methods/methodAi.py index f2b4ffdc..708ee91b 100644 --- a/modules/workflows/methods/methodAi.py +++ b/modules/workflows/methods/methodAi.py @@ -10,9 +10,7 @@ from datetime import datetime, UTC from modules.workflows.methods.methodBase import MethodBase, action from modules.datamodels.datamodelChat import ActionResult -from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum -from modules.datamodels.datamodelChat import ChatDocument -from modules.aicore.aicorePluginTavily import WebResearchRequest +from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptImage logger = logging.getLogger(__name__) @@ -163,272 +161,50 @@ class MethodAi(MethodBase): ) - @action - async def webSearch(self, parameters: Dict[str, Any]) -> ActionResult: - """ - GENERAL: - - Purpose: Search the web and return a list of relevant URLs only. - - Input requirements: searchPrompt (required); optional maxResults, timeRange, country, language. - - Output format: JSON with search results and URLs. - - Parameters: - - searchPrompt (str, required): Natural language search prompt describing what to search for. - - maxResults (int, optional): Maximum number of search results. Default: 5. - - timeRange (str, optional): d | w | m | y for time filtering. - - country (str, optional): Country name for localized results. - - language (str, optional): Language code (e.g., de, en, fr). - """ - try: - searchPrompt = parameters.get("searchPrompt") - if not searchPrompt: - return ActionResult.isFailure(error="Search prompt is required") - - # Extract optional parameters - maxResults = parameters.get("maxResults", 5) - timeRange = parameters.get("timeRange") - country = parameters.get("country") - language = parameters.get("language") - - # Build AI call options for web search - options = AiCallOptions( - operationType=OperationTypeEnum.WEB_SEARCH, - resultFormat="json" - ) - - # Create unified prompt JSON that both Tavily and Perplexity can understand - promptData = { - "searchPrompt": searchPrompt, - "maxResults": maxResults, - "timeRange": timeRange, - "country": country, - "language": language, - "instructions": "Search the web and return a JSON response with a 'results' array containing objects with 'title', 'url', and optionally 'content' fields. Focus on finding relevant URLs for the search prompt." - } - - import json - prompt = json.dumps(promptData, indent=2) - - # Call AI service through unified path - result = await self.services.ai.callAiDocuments( - prompt=prompt, - documents=None, - options=options, - outputFormat="json" - ) - - # Process result to ensure consistent format - processedResult = self._processWebSearchResult(result) - - # Create meaningful filename - meaningfulName = self._generateMeaningfulFileName( - base_name="web_search", - extension="json", - action_name="search" - ) - - from modules.datamodels.datamodelChat import ActionDocument - actionDocument = ActionDocument( - documentName=meaningfulName, - documentData=processedResult, - mimeType="application/json" - ) - - return ActionResult.isSuccess(documents=[actionDocument]) - - except Exception as e: - logger.error(f"Error in web search: {str(e)}") - return ActionResult.isFailure(error=str(e)) - - def _processWebSearchResult(self, result: str) -> str: - """ - Process web search result to ensure consistent JSON format with URL list. - Both Tavily and Perplexity now return proper JSON format. - """ - try: - import json - data = json.loads(result) - - # If it's already a proper search result format, return as-is - if isinstance(data, dict) and "results" in data: - return result - - # If it's a different JSON format, try to extract URLs - if isinstance(data, dict): - # Look for URL patterns in the JSON - urls = self._extractUrlsFromJson(data) - if urls: - processedData = { - "query": data.get("query", "web search"), - "results": [{"title": f"Result {i+1}", "url": url} for i, url in enumerate(urls)], - "total_count": len(urls) - } - return json.dumps(processedData, indent=2) - - # No URLs found, return original result in a structured format - processedData = { - "query": "web search", - "results": [], - "total_count": 0, - "raw_response": result - } - return json.dumps(processedData, indent=2) - - except Exception as e: - logger.warning(f"Error processing web search result: {str(e)}") - # Return original result wrapped in error format - errorData = { - "query": "web search", - "results": [], - "total_count": 0, - "error": f"Failed to process result: {str(e)}", - "raw_response": result - } - return json.dumps(errorData, indent=2) - - def _extractUrlsFromJson(self, data: Dict[str, Any]) -> List[str]: - """Extract URLs from JSON data structure.""" - urls = [] - - def _extractFromValue(value): - if isinstance(value, str): - # Check if it's a URL - if value.startswith(('http://', 'https://')): - urls.append(value) - elif isinstance(value, dict): - for v in value.values(): - _extractFromValue(v) - elif isinstance(value, list): - for item in value: - _extractFromValue(item) - - _extractFromValue(data) - return list(set(urls)) # Remove duplicates - - - @action - async def webCrawl(self, parameters: Dict[str, Any]) -> ActionResult: - """ - GENERAL: - - Purpose: Extract content from specific URLs. - - Input requirements: urls (required); optional extractDepth, format. - - Output format: JSON with extracted content from URLs. - - Parameters: - - urls (list, required): List of URLs to crawl and extract content from. - - extractDepth (str, optional): basic | advanced. Default: advanced. - - format (str, optional): markdown | html | text. Default: markdown. - """ - try: - urls = parameters.get("urls") - if not urls or not isinstance(urls, list): - return ActionResult.isFailure(error="URLs list is required") - - # Extract optional parameters - extractDepth = parameters.get("extractDepth", "advanced") - formatType = parameters.get("format", "markdown") - - # Build AI call options for web crawling - options = AiCallOptions( - operationType=OperationTypeEnum.WEB_CRAWL, - resultFormat="json" - ) - - # Create unified prompt JSON for web crawling - promptData = { - "urls": urls, - "extractDepth": extractDepth, - "format": formatType, - "instructions": "Extract content from the provided URLs and return a JSON response with 'results' array containing objects with 'url', 'title', 'content', and 'extractedAt' fields." - } - - import json - prompt = json.dumps(promptData, indent=2) - - # Call AI service through unified path - result = await self.services.ai.callAiDocuments( - prompt=prompt, - documents=None, - options=options, - outputFormat="json" - ) - - # Create meaningful filename - meaningfulName = self._generateMeaningfulFileName( - base_name="web_crawl", - extension="json", - action_name="crawl" - ) - - from modules.datamodels.datamodelChat import ActionDocument - actionDocument = ActionDocument( - documentName=meaningfulName, - documentData=result, - mimeType="application/json" - ) - - return ActionResult.isSuccess(documents=[actionDocument]) - - except Exception as e: - logger.error(f"Error in web crawl: {str(e)}") - return ActionResult.isFailure(error=str(e)) - - @action async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - - Purpose: Comprehensive web research combining search and content extraction. - - Input requirements: researchPrompt (required); optional maxResults, urls, timeRange, country, language. - - Output format: JSON with research results, sources, and analysis. + - Purpose: Web research with two-step process: search for URLs, then crawl content. + - Input requirements: prompt (required); optional list(url), country, language, researchDepth. + - Output format: JSON with research results including URLs and content. Parameters: - - researchPrompt (str, required): Natural language research prompt describing what to research. - - maxResults (int, optional): Maximum search results. Default: 5. - - urls (list, optional): Specific URLs to include in research. - - timeRange (str, optional): d | w | m | y for time filtering. - - country (str, optional): Country name for localized results. - - language (str, optional): Language code (e.g., de, en, fr). + - prompt (str, required): Natural language research instruction, including time range if relevant. + - list(url) (list, optional): Specific URLs to crawl, if needed. + - country (str, optional): Two-digit country code (lowercase, e.g., ch, us, de). + - language (str, optional): Language code (lowercase, e.g., de, en, fr). + - researchDepth (str, optional): Research depth - fast, general, or deep. Default: general. """ try: - researchPrompt = parameters.get("researchPrompt") - if not researchPrompt: + prompt = parameters.get("prompt") + if not prompt: return ActionResult.isFailure(error="Research prompt is required") - # Extract optional parameters - maxResults = parameters.get("maxResults", 5) - urls = parameters.get("urls") - timeRange = parameters.get("timeRange") - country = parameters.get("country") - language = parameters.get("language") + # Init progress logger + operationId = f"web_research_{self.services.currentWorkflow.id}_{int(time.time())}" - # Build AI call options for web research - options = AiCallOptions( - operationType=OperationTypeEnum.WEB_RESEARCH, - resultFormat="json" + # Start progress tracking + self.services.workflow.progressLogStart( + operationId, + "Web Research", + "Searching and Crawling", + "Extracting URLs and Content" ) - # Create unified prompt JSON for web research - promptData = { - "researchPrompt": researchPrompt, - "maxResults": maxResults, - "urls": urls, - "timeRange": timeRange, - "country": country, - "language": language, - "instructions": "Conduct comprehensive web research and return a JSON response with 'results' array containing objects with 'title', 'url', 'content', and 'analysis' fields. Provide detailed analysis and insights." - } - - import json - prompt = json.dumps(promptData, indent=2) - - # Call AI service through unified path - result = await self.services.ai.callAiDocuments( + # Call webcrawl service - service handles all AI intention analysis and processing + result = await self.services.web.performWebResearch( prompt=prompt, - documents=None, - options=options, - outputFormat="json" + urls=parameters.get("list(url)", []), + country=parameters.get("country"), + language=parameters.get("language"), + researchDepth=parameters.get("researchDepth", "general"), + operationId=operationId ) + # Complete progress tracking + self.services.workflow.progressLogFinish(operationId, True) + # Create meaningful filename meaningfulName = self._generateMeaningfulFileName( base_name="web_research", @@ -447,157 +223,10 @@ class MethodAi(MethodBase): except Exception as e: logger.error(f"Error in web research: {str(e)}") - return ActionResult.isFailure(error=str(e)) - - - @action - async def webQuestions(self, parameters: Dict[str, Any]) -> ActionResult: - """ - GENERAL: - - Purpose: Answer questions using web research and AI analysis. - - Input requirements: question (required); optional context, maxResults, timeRange, country, language. - - Output format: JSON with question answer and supporting sources. - - Parameters: - - question (str, required): Question to be answered using web research. - - context (str, optional): Additional context for the question. - - maxResults (int, optional): Maximum search results. Default: 5. - - timeRange (str, optional): d | w | m | y for time filtering. - - country (str, optional): Country name for localized results. - - language (str, optional): Language code (e.g., de, en, fr). - """ - try: - question = parameters.get("question") - if not question: - return ActionResult.isFailure(error="Question is required") - - # Extract optional parameters - context = parameters.get("context", "") - maxResults = parameters.get("maxResults", 5) - timeRange = parameters.get("timeRange") - country = parameters.get("country") - language = parameters.get("language") - - # Build AI call options for web questions - options = AiCallOptions( - operationType=OperationTypeEnum.WEB_QUESTIONS, - resultFormat="json" - ) - - # Create unified prompt JSON for web questions - promptData = { - "question": question, - "context": context, - "maxResults": maxResults, - "timeRange": timeRange, - "country": country, - "language": language, - "instructions": "Answer the question using web research and return a JSON response with 'answer', 'sources' array containing objects with 'title', 'url', 'content', and 'relevance' fields." - } - - import json - prompt = json.dumps(promptData, indent=2) - - # Call AI service through unified path - result = await self.services.ai.callAiDocuments( - prompt=prompt, - documents=None, - options=options, - outputFormat="json" - ) - - # Create meaningful filename - meaningfulName = self._generateMeaningfulFileName( - base_name="web_questions", - extension="json", - action_name="questions" - ) - - from modules.datamodels.datamodelChat import ActionDocument - actionDocument = ActionDocument( - documentName=meaningfulName, - documentData=result, - mimeType="application/json" - ) - - return ActionResult.isSuccess(documents=[actionDocument]) - - except Exception as e: - logger.error(f"Error in web questions: {str(e)}") - return ActionResult.isFailure(error=str(e)) - - - @action - async def webNews(self, parameters: Dict[str, Any]) -> ActionResult: - """ - GENERAL: - - Purpose: Search and analyze news articles on specific topics. - - Input requirements: newsPrompt (required); optional maxResults, timeRange, country, language. - - Output format: JSON with news articles, summaries, and analysis. - - Parameters: - - newsPrompt (str, required): Natural language prompt describing what news to search for. - - maxResults (int, optional): Maximum news articles. Default: 5. - - timeRange (str, optional): d | w | m | y for time filtering. Default: w. - - country (str, optional): Country name for localized news. - - language (str, optional): Language code (e.g., de, en, fr). - """ - try: - newsPrompt = parameters.get("newsPrompt") - if not newsPrompt: - return ActionResult.isFailure(error="News prompt is required") - - # Extract optional parameters - maxResults = parameters.get("maxResults", 5) - timeRange = parameters.get("timeRange", "w") # Default to week - country = parameters.get("country") - language = parameters.get("language") - - # Build AI call options for web news - options = AiCallOptions( - operationType=OperationTypeEnum.WEB_NEWS, - resultFormat="json" - ) - - # Create unified prompt JSON for web news - promptData = { - "newsPrompt": newsPrompt, - "maxResults": maxResults, - "timeRange": timeRange, - "country": country, - "language": language, - "instructions": "Find and analyze recent news articles and return a JSON response with 'articles' array containing objects with 'title', 'url', 'content', 'date', 'source', and 'summary' fields." - } - - import json - prompt = json.dumps(promptData, indent=2) - - # Call AI service through unified path - result = await self.services.ai.callAiDocuments( - prompt=prompt, - documents=None, - options=options, - outputFormat="json" - ) - - # Create meaningful filename - meaningfulName = self._generateMeaningfulFileName( - base_name="web_news", - extension="json", - action_name="news" - ) - - from modules.datamodels.datamodelChat import ActionDocument - actionDocument = ActionDocument( - documentName=meaningfulName, - documentData=result, - mimeType="application/json" - ) - - return ActionResult.isSuccess(documents=[actionDocument]) - - except Exception as e: - logger.error(f"Error in web news: {str(e)}") + try: + self.services.workflow.progressLogFinish(operationId, False) + except: + pass return ActionResult.isFailure(error=str(e)) @@ -631,17 +260,16 @@ class MethodAi(MethodBase): resultFormat="base64" ) - # Create unified prompt JSON for image generation - promptData = { - "prompt": prompt, - "size": size, - "quality": quality, - "style": style, - "instructions": "Generate an image based on the prompt and return the base64 encoded image data." - } + # Create structured prompt using Pydantic model + promptModel = AiCallPromptImage( + prompt=prompt, + size=size, + quality=quality, + style=style + ) - import json - promptJson = json.dumps(promptData, indent=2) + # Convert to JSON string for prompt + promptJson = promptModel.model_dump_json(exclude_none=True, indent=2) # Call AI service through unified path result = await self.services.ai.callAiDocuments( diff --git a/test_ai_models.py b/test_ai_models.py index 9d841829..2906afd1 100644 --- a/test_ai_models.py +++ b/test_ai_models.py @@ -91,26 +91,18 @@ class AIModelsTester: print(f"TESTING MODEL: {modelName}") print(f"{'='*60}") - # Choose test prompt based on model type - Web models get JSON formatted prompts + # Use same prompt for all web models import json - if "tavily" in modelName.lower(): - # Tavily models get web search prompt in JSON format (from methodAi.py) + if "tavily" in modelName.lower() or "perplexity" in modelName.lower() or "llama" in modelName.lower() or "sonar" in modelName.lower() or "mistral" in modelName.lower(): + # All web models use the same JSON formatted prompt + # Country format: Use full name for Tavily (Switzerland), Perplexity converts ISO codes to names testPrompt = json.dumps({ - "searchPrompt": "Search for recent news about artificial intelligence developments in 2024. Return the top 3 results as JSON with fields: title, url, snippet.", - "maxResults": 3, - "timeRange": "y", - "country": "United States", - "instructions": "Search the web and return a JSON response with a 'results' array containing objects with 'title', 'url', and optionally 'content' fields. Focus on finding relevant URLs for the search prompt." - }, indent=2) - elif "perplexity" in modelName.lower() or "llama" in modelName.lower() or "sonar" in modelName.lower() or "mistral" in modelName.lower(): - # Perplexity models get web research prompt in JSON format (from methodAi.py) - testPrompt = json.dumps({ - "researchPrompt": "Research the latest trends in renewable energy technology. Provide a comprehensive overview with key developments, companies involved, and future prospects. Return as JSON.", + "prompt": "Research, what ValueOn company in switzerland does and who works there? Return as JSON.", "maxResults": 5, "timeRange": "y", - "country": "United States", - "instructions": "Conduct comprehensive web research and return a JSON response with 'results' array containing objects with 'title', 'url', 'content', and 'analysis' fields. Provide detailed analysis and insights." + "country": "CH", # ISO-2 code, Perplexity will convert to "Switzerland" + "format": "json" }, indent=2) else: # Fallback for other models @@ -444,9 +436,7 @@ Is Valid JSON: {result.get('isValidJson', False)} # "dall-e-3", # Skipped - image generation, test later "sonar", # Perplexity web model "sonar-pro", # Perplexity web model - "tavily-search", # Tavily web model - "tavily-extract", # Tavily web model - "tavily-search-extract", # Tavily web model + "tavily-search", # Tavily web model (unified research) # "internal-extractor", # Skipped - internal model, test later # "internal-generator", # Skipped - internal model, test later # "internal-renderer" # Skipped - internal model, test later