# Copyright (c) 2025 Patrick Motsch # All rights reserved. import logging import httpx from typing import List from fastapi import HTTPException from modules.shared.configuration import APP_CONFIG from .aicoreBase import BaseConnectorAi from modules.datamodels.datamodelAi import AiModel, PriorityEnum, ProcessingModeEnum, OperationTypeEnum, AiModelCall, AiModelResponse, createOperationTypeRatings, AiCallPromptWebSearch, AiCallPromptWebCrawl, AiCallOptions from modules.datamodels.datamodelTools import CountryCodes # Configure logger logger = logging.getLogger(__name__) def loadConfigData(): """Load configuration data for Perplexity connector""" return { "apiKey": APP_CONFIG.get('Connector_AiPerplexity_API_SECRET'), } class AiPerplexity(BaseConnectorAi): """Connector for communication with the Perplexity API.""" def __init__(self): super().__init__() # Load configuration self.config = loadConfigData() self.apiKey = self.config["apiKey"] # HttpClient for API calls self.httpClient = httpx.AsyncClient( timeout=600.0, # Timeout set to 600 seconds (10 minutes) for complex requests that may take longer headers={ "Authorization": f"Bearer {self.apiKey}", "Content-Type": "application/json", "Accept": "application/json" } ) logger.info("Perplexity Connector initialized") def getConnectorType(self) -> str: """Get the connector type identifier.""" return "perplexity" def _convertIsoCodeToCountryName(self, isoCode: str) -> str: """ Convert ISO-2 country code to Perplexity country name. Uses centralized CountryCodes mapping. """ return CountryCodes.getForPerplexity(isoCode) def getModels(self) -> List[AiModel]: """Get all available Perplexity models.""" return [ AiModel( name="sonar", displayName="Perplexity Sonar", connectorType="perplexity", apiUrl="https://api.perplexity.ai/chat/completions", temperature=0.2, maxTokens=24000, contextLength=127000, # 127K context window (updated 2026-02) costPer1kTokensInput=0.001, # $1/M tokens (updated 2026-02) costPer1kTokensOutput=0.001, # $1/M tokens (updated 2026-02) speedRating=8, qualityRating=8, functionCall=self._routeWebOperation, priority=PriorityEnum.BALANCED, processingMode=ProcessingModeEnum.ADVANCED, operationTypes=createOperationTypeRatings( (OperationTypeEnum.WEB_SEARCH_DATA, 9), (OperationTypeEnum.WEB_CRAWL, 7) ), version="sonar", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.001 + (bytesReceived / 4 / 1000) * 0.001 ), AiModel( name="sonar-pro", displayName="Perplexity Sonar Pro", connectorType="perplexity", apiUrl="https://api.perplexity.ai/chat/completions", temperature=0.2, maxTokens=24000, contextLength=200000, # 200K context window (updated 2026-02) costPer1kTokensInput=0.003, # $3/M tokens (updated 2026-02) costPer1kTokensOutput=0.015, # $15/M tokens (updated 2026-02) speedRating=6, # Slower due to AI analysis qualityRating=9, # Best AI analysis quality functionCall=self._routeWebOperation, priority=PriorityEnum.QUALITY, processingMode=ProcessingModeEnum.DETAILED, operationTypes=createOperationTypeRatings( (OperationTypeEnum.WEB_SEARCH_DATA, 9), (OperationTypeEnum.WEB_CRAWL, 8) ), version="sonar-pro", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.003 + (bytesReceived / 4 / 1000) * 0.015 ) ] async def callAiBasic(self, modelCall: AiModelCall) -> AiModelResponse: """ Calls the Perplexity API with the given messages using standardized pattern. Args: modelCall: AiModelCall with messages and options Returns: AiModelResponse with content and metadata Raises: HTTPException: For errors in API communication """ try: # Extract parameters from modelCall messages = modelCall.messages model = modelCall.model options = modelCall.options temperature = getattr(options, "temperature", None) if temperature is None: temperature = model.temperature maxTokens = model.maxTokens payload = { "model": model.name, "messages": messages, "temperature": temperature, "max_tokens": maxTokens } response = await self.httpClient.post( model.apiUrl, json=payload ) if response.status_code != 200: errorDetail = f"Perplexity API error: {response.status_code} - {response.text}" logger.error(errorDetail) # Provide more specific error messages based on status code if response.status_code == 429: errorMessage = "Rate limit exceeded. Please wait before making another request." elif response.status_code == 401: errorMessage = "Invalid API key. Please check your Perplexity API configuration." elif response.status_code == 400: errorMessage = f"Invalid request to Perplexity API: {response.text}" else: errorMessage = f"Perplexity API error ({response.status_code}): {response.text}" raise HTTPException(status_code=500, detail=errorMessage) apiResponse = response.json() content = apiResponse["choices"][0]["message"]["content"] return AiModelResponse( content=content, success=True, modelId=model.name, metadata={"response_id": apiResponse.get("id", "")} ) except Exception as e: logger.error(f"Error calling Perplexity API: {str(e)}") raise HTTPException(status_code=500, detail=f"Error calling Perplexity API: {str(e)}") async def _testConnection(self) -> bool: """ Tests the connection to the Perplexity API. Returns: True if connection is successful, False otherwise """ try: # Try a simple test message testMessages = [ {"role": "user", "content": "Hello, please respond with just 'OK' to confirm the connection works."} ] # Create a model call for testing model = self.getModels()[0] # Get first model for testing testCall = AiModelCall( messages=testMessages, model=model, options=AiCallOptions() ) response = await self.callAiBasic(testCall) return response.success and len(response.content.strip()) > 0 except Exception as e: logger.error(f"Perplexity connection test failed: {str(e)}") return False async def _routeWebOperation(self, modelCall: AiModelCall) -> AiModelResponse: """ Route web operation based on operation type. Args: modelCall: AiModelCall with messages and options Returns: AiModelResponse based on operation type """ operationType = modelCall.options.operationType if operationType == OperationTypeEnum.WEB_SEARCH_DATA: return await self.webSearch(modelCall) elif operationType == OperationTypeEnum.WEB_CRAWL: return await self.webCrawl(modelCall) else: # Fallback to basic call return await self.callAiBasic(modelCall) def _getDepthInstructions(self, maxDepth: int) -> str: """ Map maxDepth (numeric) to instructional text for LLM. Args: maxDepth: 1 (fast/overview), 2 (general/standard), 3 (deep/comprehensive) Returns: Instructional text for the LLM """ depthMap = { 1: "Basic overview - extract main content from the main page only", 2: "Standard crawl - extract content from main page and linked pages (2 levels deep)", 3: "Deep crawl - comprehensively extract content from main page and all accessible linked pages (3+ levels deep)" } return depthMap.get(maxDepth, depthMap[2]) def _getWidthInstructions(self, maxWidth: int) -> str: """ Map maxWidth (numeric) to instructional text for LLM. Args: maxWidth: Number of pages to crawl at each level (default: 10) Returns: Instructional text for the LLM """ if maxWidth <= 5: return f"Focused crawl - limit to {maxWidth} most relevant pages per level" elif maxWidth <= 15: return f"Standard breadth - crawl up to {maxWidth} pages per level" elif maxWidth <= 30: return f"Wide crawl - crawl up to {maxWidth} pages per level, prioritize quality" else: return f"Extensive crawl - crawl up to {maxWidth} pages per level, comprehensive coverage" async def webSearch(self, modelCall: AiModelCall) -> AiModelResponse: """ WEB_SEARCH_DATA operation - returns list of URLs based on search query. Args: modelCall: AiModelCall with AiCallPromptWebSearch as prompt Returns: AiModelResponse with JSON list of URLs """ try: # Extract parameters messages = modelCall.messages model = modelCall.model options = modelCall.options temperature = getattr(options, "temperature", None) or model.temperature maxTokens = model.maxTokens # Parse prompt JSON - find user message (not system message) promptContent = "" if messages: for msg in messages: if msg.get("role") == "user": promptContent = msg.get("content", "") break # Fallback to first message if no user message found if not promptContent and len(messages) > 0: promptContent = messages[0].get("content", "") import json promptData = json.loads(promptContent) # Create Pydantic model webSearchPrompt = AiCallPromptWebSearch(**promptData) # Convert ISO country code to country name countryName = webSearchPrompt.country if countryName: countryName = self._convertIsoCodeToCountryName(countryName) # Build search request for Perplexity searchPrompt = f"""Search the web for: {webSearchPrompt.instruction} Return a JSON array of {webSearchPrompt.maxNumberPages} most relevant URLs. {'' if not countryName else f'Focus on results from {countryName}.'} Return ONLY a JSON array of URLs, no additional text: [ "https://example1.com/page", "https://example2.com/article", "https://example3.com/resource" ]""" payload = { "model": model.name, "messages": [{"role": "user", "content": searchPrompt}], "temperature": temperature, "max_tokens": maxTokens } response = await self.httpClient.post(model.apiUrl, json=payload) if response.status_code != 200: raise HTTPException(status_code=500, detail=f"Perplexity Web Search API error: {response.text}") # Check if response body is empty or invalid responseText = response.text if not responseText or not responseText.strip(): raise HTTPException(status_code=500, detail="Perplexity Web Search API returned empty response") try: apiResponse = response.json() except Exception as jsonError: logger.error(f"Failed to parse Perplexity response as JSON. Status: {response.status_code}, Response: {responseText[:500]}") raise HTTPException(status_code=500, detail=f"Perplexity Web Search API returned invalid JSON: {str(jsonError)}") if "choices" not in apiResponse or not apiResponse["choices"]: raise HTTPException(status_code=500, detail="Perplexity Web Search API response missing 'choices' field") content = apiResponse["choices"][0]["message"]["content"] return AiModelResponse( content=content, success=True, modelId=model.name, metadata={"response_id": apiResponse.get("id", ""), "operation": "WEB_SEARCH_DATA"} ) except Exception as e: logger.error(f"Error in Perplexity web search: {str(e)}") raise HTTPException(status_code=500, detail=f"Error in Perplexity web search: {str(e)}") async def webCrawl(self, modelCall: AiModelCall) -> AiModelResponse: """ WEB_CRAWL operation - crawls ONE URL and returns content. Perplexity API Parameters Used: - messages: The prompt containing URL and instruction - max_tokens: Maximum response length - max_results: Number of search results (1-20, default: 10) - temperature: Response randomness (not web search specific) Pagination: Perplexity does NOT return paginated responses. A single response contains all results within max_tokens limit. Args: modelCall: AiModelCall with AiCallPromptWebCrawl as prompt Returns: AiModelResponse with crawl results as JSON object """ try: # Extract parameters messages = modelCall.messages model = modelCall.model options = modelCall.options temperature = getattr(options, "temperature", None) or model.temperature maxTokens = model.maxTokens # Parse prompt JSON - find user message (not system message) promptContent = "" if messages: for msg in messages: if msg.get("role") == "user": promptContent = msg.get("content", "") break # Fallback to first message if no user message found if not promptContent and len(messages) > 0: promptContent = messages[0].get("content", "") import json promptData = json.loads(promptContent) # Create Pydantic model webCrawlPrompt = AiCallPromptWebCrawl(**promptData) # Build crawl request for Perplexity - ONE URL # Match playground prompt style: just URL + question # This allows Perplexity to return detailed multi-source results crawlPrompt = f"{webCrawlPrompt.url}: {webCrawlPrompt.instruction}" # Build payload with optional Perplexity parameters # Note: max_tokens_per_page may not be supported by chat/completions endpoint # The playground Python SDK might use a different internal API maxResults = min(webCrawlPrompt.maxWidth or 10, 20) # Max 20 results payload = { "model": model.name, "messages": [{"role": "user", "content": crawlPrompt}], "temperature": temperature, "max_tokens": maxTokens, # Use model's configured maxTokens (24000) "max_results": maxResults, "return_citations": True # Request citations explicitly } logger.info(f"Perplexity crawl payload: model={model.name}, prompt_length={len(crawlPrompt)}, max_tokens={maxTokens}, max_results={maxResults}") response = await self.httpClient.post(model.apiUrl, json=payload) if response.status_code != 200: raise HTTPException(status_code=500, detail=f"Perplexity Web Crawl API error: {response.text}") # Check if response body is empty or invalid responseText = response.text if not responseText or not responseText.strip(): raise HTTPException(status_code=500, detail="Perplexity Web Crawl API returned empty response") try: apiResponse = response.json() except Exception as jsonError: logger.error(f"Failed to parse Perplexity response as JSON. Status: {response.status_code}, Response: {responseText[:500]}") raise HTTPException(status_code=500, detail=f"Perplexity Web Crawl API returned invalid JSON: {str(jsonError)}") if "choices" not in apiResponse or not apiResponse["choices"]: raise HTTPException(status_code=500, detail="Perplexity Web Crawl API response missing 'choices' field") # Extract the main content content = apiResponse["choices"][0]["message"]["content"] # Check for citations or search results in the response citations = apiResponse.get("citations", []) searchResults = apiResponse.get("search_results", []) # Log what we found if citations: logger.info(f"Found {len(citations)} citations in response") if searchResults: logger.info(f"Found {len(searchResults)} search results in response") logger.debug(f"API response keys: {list(apiResponse.keys())}") # Build comprehensive response with citations if available import json responseData = { "content": content, "citations": citations if citations else [], "search_results": searchResults if searchResults else [] } # Return comprehensive response return AiModelResponse( content=json.dumps(responseData, indent=2) if (citations or searchResults) else content, success=True, modelId=model.name, metadata={ "response_id": apiResponse.get("id", ""), "operation": "WEB_CRAWL", "url": webCrawlPrompt.url, "actualPromptSent": crawlPrompt, "has_citations": len(citations) > 0, "has_search_results": len(searchResults) > 0 } ) except Exception as e: logger.error(f"Error in Perplexity web crawl: {str(e)}") raise HTTPException(status_code=500, detail=f"Error in Perplexity web crawl: {str(e)}")