# Copyright (c) 2025 Patrick Motsch # All rights reserved. import json import logging import httpx import os from typing import Dict, Any, List, AsyncGenerator, Optional, Union from fastapi import HTTPException from modules.shared.configuration import APP_CONFIG from .aicoreBase import BaseConnectorAi, RateLimitExceededException from modules.datamodels.datamodelAi import AiModel, PriorityEnum, ProcessingModeEnum, OperationTypeEnum, AiModelCall, AiModelResponse, createOperationTypeRatings # Configure logger logger = logging.getLogger(__name__) def loadConfigData(): """Load configuration data for Anthropic connector""" return { "apiKey": APP_CONFIG.get('Connector_AiAnthropic_API_SECRET'), } class AiAnthropic(BaseConnectorAi): """Connector for communication with the Anthropic API.""" def __init__(self): super().__init__() # Load configuration self.config = loadConfigData() self.apiKey = self.config["apiKey"] # HttpClient for API calls # Timeout set to 600 seconds (10 minutes) for complex requests that may take longer # Document generation and complex AI operations can take significantly longer self.httpClient = httpx.AsyncClient( timeout=600.0, headers={ "x-api-key": self.apiKey, "anthropic-version": "2023-06-01", # Anthropic API Version "Content-Type": "application/json" } ) logger.info("Anthropic Connector initialized") def getConnectorType(self) -> str: """Get the connector type identifier.""" return "anthropic" def getModels(self) -> List[AiModel]: # Get all available Anthropic models. return [ AiModel( name="claude-sonnet-4-5-20250929", displayName="Anthropic Claude Sonnet 4.5", connectorType="anthropic", apiUrl="https://api.anthropic.com/v1/messages", temperature=0.2, maxTokens=8192, contextLength=200000, costPer1kTokensInput=0.003, # $3/M tokens (updated 2026-02) costPer1kTokensOutput=0.015, # $15/M tokens (updated 2026-02) speedRating=6, # Slower due to high-quality processing qualityRating=10, # Best quality available functionCall=self.callAiBasic, functionCallStream=self.callAiBasicStream, priority=PriorityEnum.QUALITY, processingMode=ProcessingModeEnum.DETAILED, operationTypes=createOperationTypeRatings( (OperationTypeEnum.PLAN, 9), (OperationTypeEnum.DATA_ANALYSE, 9), (OperationTypeEnum.DATA_GENERATE, 9), (OperationTypeEnum.DATA_EXTRACT, 8), (OperationTypeEnum.AGENT, 9), ), version="claude-sonnet-4-5-20250929", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.003 + (bytesReceived / 4 / 1000) * 0.015 ), AiModel( name="claude-haiku-4-5-20251001", displayName="Anthropic Claude Haiku 4.5", connectorType="anthropic", apiUrl="https://api.anthropic.com/v1/messages", temperature=0.2, maxTokens=8192, contextLength=200000, costPer1kTokensInput=0.001, # $1/M tokens (updated 2026-02) costPer1kTokensOutput=0.005, # $5/M tokens (updated 2026-02) speedRating=9, # Very fast, lightweight model qualityRating=8, # Good quality, cost-efficient functionCall=self.callAiBasic, functionCallStream=self.callAiBasicStream, priority=PriorityEnum.SPEED, processingMode=ProcessingModeEnum.BASIC, operationTypes=createOperationTypeRatings( (OperationTypeEnum.PLAN, 8), (OperationTypeEnum.DATA_ANALYSE, 8), (OperationTypeEnum.DATA_GENERATE, 8), (OperationTypeEnum.DATA_EXTRACT, 7), (OperationTypeEnum.AGENT, 7), ), version="claude-haiku-4-5-20251001", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.001 + (bytesReceived / 4 / 1000) * 0.005 ), AiModel( name="claude-opus-4-6", displayName="Anthropic Claude Opus 4.6", connectorType="anthropic", apiUrl="https://api.anthropic.com/v1/messages", temperature=0.2, maxTokens=8192, contextLength=200000, costPer1kTokensInput=0.005, # $5/M tokens (updated 2026-02) costPer1kTokensOutput=0.025, # $25/M tokens (updated 2026-02) speedRating=5, # Moderate latency, most capable qualityRating=10, # Top-tier intelligence functionCall=self.callAiBasic, functionCallStream=self.callAiBasicStream, priority=PriorityEnum.QUALITY, processingMode=ProcessingModeEnum.DETAILED, operationTypes=createOperationTypeRatings( (OperationTypeEnum.PLAN, 10), (OperationTypeEnum.DATA_ANALYSE, 8), (OperationTypeEnum.DATA_GENERATE, 10), (OperationTypeEnum.DATA_EXTRACT, 9), (OperationTypeEnum.AGENT, 10), ), version="claude-opus-4-6", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.005 + (bytesReceived / 4 / 1000) * 0.025 ), AiModel( name="claude-sonnet-4-5-20250929", displayName="Anthropic Claude Sonnet 4.5 Vision", connectorType="anthropic", apiUrl="https://api.anthropic.com/v1/messages", temperature=0.2, maxTokens=8192, contextLength=200000, costPer1kTokensInput=0.003, # $3/M tokens (updated 2026-02) costPer1kTokensOutput=0.015, # $15/M tokens (updated 2026-02) speedRating=6, qualityRating=10, functionCall=self.callAiImage, priority=PriorityEnum.QUALITY, processingMode=ProcessingModeEnum.DETAILED, operationTypes=createOperationTypeRatings( (OperationTypeEnum.IMAGE_ANALYSE, 10) ), version="claude-sonnet-4-5-20250929", calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.003 + (bytesReceived / 4 / 1000) * 0.015 ) ] async def callAiBasic(self, modelCall: AiModelCall) -> AiModelResponse: """ Calls the Anthropic API with the given messages using standardized pattern. Args: modelCall: AiModelCall with messages and options Returns: AiModelResponse with content and metadata Raises: HTTPException: For errors in API communication """ try: model = modelCall.model options = modelCall.options temperature = getattr(options, "temperature", None) if temperature is None: temperature = model.temperature maxTokens = model.maxTokens converted_messages, system_prompt = _convertMessagesForAnthropic(modelCall.messages) payload: Dict[str, Any] = { "model": model.name, "messages": converted_messages, "temperature": temperature, } # Anthropic requires max_tokens - use provided value or throw error if maxTokens is None: raise ValueError("maxTokens must be provided for Anthropic API calls") payload["max_tokens"] = maxTokens if system_prompt: payload["system"] = system_prompt if modelCall.tools: payload["tools"] = _convertToolsToAnthropicFormat(modelCall.tools) if modelCall.toolChoice: payload["tool_choice"] = modelCall.toolChoice else: payload["tool_choice"] = {"type": "auto"} response = await self.httpClient.post( model.apiUrl, json=payload ) if response.status_code != 200: error_detail = f"Anthropic API error: {response.status_code} - {response.text}" logger.error(error_detail) if response.status_code == 429: raise RateLimitExceededException( f"Rate limit exceeded for {model.name}: {response.text}" ) if response.status_code == 529: error_message = "Anthropic API is currently overloaded. Please try again in a few minutes." elif response.status_code == 401: error_message = "Invalid API key. Please check your Anthropic API configuration." elif response.status_code == 400: error_message = f"Invalid request to Anthropic API: {response.text}" else: error_message = f"Anthropic API error ({response.status_code}): {response.text}" raise HTTPException(status_code=500, detail=error_message) # Parse response anthropicResponse = response.json() # Extract content and tool_use blocks from response content = "" toolCalls = [] if "content" in anthropicResponse: if isinstance(anthropicResponse["content"], list): for part in anthropicResponse["content"]: if part.get("type") == "text": content += part.get("text", "") elif part.get("type") == "tool_use": toolCalls.append({ "id": part.get("id", ""), "type": "function", "function": { "name": part.get("name", ""), "arguments": json.dumps(part.get("input", {})) if isinstance(part.get("input"), dict) else str(part.get("input", "{}")) } }) else: content = anthropicResponse["content"] if not content and not toolCalls: logger.warning(f"Anthropic API returned empty content. Full response: {anthropicResponse}") content = "[Anthropic API returned empty response]" metadata = {"response_id": anthropicResponse.get("id", "")} if toolCalls: metadata["toolCalls"] = toolCalls return AiModelResponse( content=content, success=True, modelId=model.name, metadata=metadata ) except (RateLimitExceededException, HTTPException): raise except Exception as e: error_msg = str(e) if str(e) else f"{type(e).__name__}" error_detail = f"Error calling Anthropic API: {error_msg}" if hasattr(e, 'detail') and e.detail: error_detail += f" | Detail: {e.detail}" if hasattr(e, 'status_code'): error_detail += f" | Status: {e.status_code}" logger.error(error_detail, exc_info=True) raise HTTPException(status_code=500, detail=error_detail) async def callAiBasicStream(self, modelCall: AiModelCall) -> AsyncGenerator[Union[str, AiModelResponse], None]: """Stream Anthropic response. Yields str deltas, then final AiModelResponse.""" try: model = modelCall.model options = modelCall.options temperature = getattr(options, "temperature", None) if temperature is None: temperature = model.temperature converted, system_prompt = _convertMessagesForAnthropic(modelCall.messages) payload: Dict[str, Any] = { "model": model.name, "messages": converted, "temperature": temperature, "max_tokens": model.maxTokens, "stream": True, } if system_prompt: payload["system"] = system_prompt if modelCall.tools: payload["tools"] = _convertToolsToAnthropicFormat(modelCall.tools) payload["tool_choice"] = modelCall.toolChoice or {"type": "auto"} fullContent = "" toolUseBlocks: Dict[int, Dict[str, Any]] = {} currentToolIdx = -1 stopReason: Optional[str] = None async with self.httpClient.stream("POST", model.apiUrl, json=payload) as response: if response.status_code != 200: body = await response.aread() bodyStr = body.decode() if response.status_code == 429: raise RateLimitExceededException( f"Rate limit exceeded for {model.name}: {bodyStr}" ) raise HTTPException(status_code=500, detail=f"Anthropic stream error: {response.status_code} - {bodyStr}") async for line in response.aiter_lines(): if not line.startswith("data: "): continue try: event = json.loads(line[6:]) except json.JSONDecodeError: continue eventType = event.get("type", "") if eventType == "error": errDetail = event.get("error", {}) errMsg = errDetail.get("message", str(errDetail)) errType = errDetail.get("type", "unknown") logger.error(f"Anthropic stream error event: type={errType}, message={errMsg}") if "overloaded" in errMsg.lower() or "overloaded" in errType.lower(): raise HTTPException(status_code=500, detail=f"Anthropic API is currently overloaded. Please try again in a few minutes.") raise HTTPException(status_code=500, detail=f"Anthropic stream error: [{errType}] {errMsg}") elif eventType == "content_block_start": block = event.get("content_block", {}) idx = event.get("index", 0) if block.get("type") == "tool_use": currentToolIdx = idx toolUseBlocks[idx] = { "id": block.get("id", ""), "name": block.get("name", ""), "arguments": "", } elif eventType == "content_block_delta": delta = event.get("delta", {}) if delta.get("type") == "text_delta": text = delta.get("text", "") fullContent += text yield text elif delta.get("type") == "input_json_delta": idx = event.get("index", currentToolIdx) if idx in toolUseBlocks: toolUseBlocks[idx]["arguments"] += delta.get("partial_json", "") elif eventType == "message_delta": delta = event.get("delta", {}) stopReason = delta.get("stop_reason", stopReason) elif eventType == "message_stop": break if not fullContent and not toolUseBlocks: logger.warning( f"Anthropic stream returned empty response: model={model.name}, " f"stopReason={stopReason}" ) metadata: Dict[str, Any] = {} if stopReason: metadata["stopReason"] = stopReason if toolUseBlocks: metadata["toolCalls"] = [ { "id": tb["id"], "type": "function", "function": { "name": tb["name"], "arguments": tb["arguments"], }, } for tb in toolUseBlocks.values() ] yield AiModelResponse( content=fullContent, success=True, modelId=model.name, metadata=metadata, ) except (RateLimitExceededException, HTTPException): raise except Exception as e: logger.error(f"Error streaming Anthropic API: {e}", exc_info=True) raise HTTPException(status_code=500, detail=f"Error streaming Anthropic API: {e}") async def callAiImage(self, modelCall: AiModelCall) -> AiModelResponse: """ Analyzes an image using Anthropic's vision capabilities using standardized pattern. Args: modelCall: AiModelCall with messages and image data in options Returns: AiModelResponse with analysis content """ try: # Extract parameters from messages for Anthropic Vision API messages = modelCall.messages model = modelCall.model # Verify messages contain image data if not messages or not messages[0].get("content"): raise ValueError("No messages provided for image analysis") logger.info(f"callAiImage called with {len(messages)} message(s)...") # Extract text prompt and image data from messages # Messages format: [{"role": "user", "content": [{"type": "text", "text": "..."}, {"type": "image_url", "image_url": {"url": "data:..."}}]}] userContent = messages[0]["content"] if not isinstance(userContent, list): raise ValueError("Expected content to be a list for vision") textPrompt = "" imageUrl = None for contentItem in userContent: if contentItem.get("type") == "text": textPrompt = contentItem.get("text", "") or "" elif contentItem.get("type") == "image_url": imageUrlDict = contentItem.get("image_url") if imageUrlDict and isinstance(imageUrlDict, dict): imageUrl = imageUrlDict.get("url", "") or "" else: imageUrl = None if not imageUrl or not imageUrl.startswith("data:"): raise ValueError("No image data found in messages") # Extract base64 data and mime type from data URL # Format: data:image/jpeg;base64,/9j/4AAQSkZ... parts = imageUrl.split(";base64,") if len(parts) != 2: raise ValueError("Invalid image data URL format") mimeType = parts[0].replace("data:", "") base64Data = parts[1] import base64 as _b64 try: rawHead = _b64.b64decode(base64Data[:32]) if rawHead[:3] == b"\xff\xd8\xff": mimeType = "image/jpeg" elif rawHead[:8] == b"\x89PNG\r\n\x1a\n": mimeType = "image/png" elif rawHead[:4] == b"GIF8": mimeType = "image/gif" elif rawHead[:4] == b"RIFF" and rawHead[8:12] == b"WEBP": mimeType = "image/webp" except Exception: pass # Convert to Anthropic's vision format anthropicMessages = [{ "role": "user", "content": [ {"type": "text", "text": textPrompt}, { "type": "image", "source": { "type": "base64", "media_type": mimeType, "data": base64Data } } ] }] # Call Anthropic API directly for vision import time import base64 startTime = time.time() # Prepare system prompt if available systemPrompt = None for msg in messages: if msg.get("role") == "system": systemContent = msg.get("content") if isinstance(systemContent, list): textParts = [] for item in systemContent: if item.get("type") == "text": textValue = item.get("text") if textValue is not None: textParts.append(str(textValue)) if textParts: systemPrompt = "\n".join(textParts) elif systemContent is not None: systemPrompt = str(systemContent) break # Get parameters from model (consistent with callAiBasic) maxTokens = model.maxTokens if hasattr(model, 'maxTokens') else 8192 temperature = model.temperature if hasattr(model, 'temperature') else 0.2 # Prepare API payload payload = { "model": model.name, # Use standard model.name "max_tokens": maxTokens, "messages": anthropicMessages } if systemPrompt: payload["system"] = systemPrompt # Set temperature from model payload["temperature"] = temperature # Make API call with headers from httpClient (which includes anthropic-version) response = await self.httpClient.post( "https://api.anthropic.com/v1/messages", json=payload ) if response.status_code != 200: errorText = response.text logger.error(f"Anthropic API error: {response.status_code} - {errorText}") raise HTTPException(status_code=response.status_code, detail=f"Anthropic API error: {errorText}") # Parse response result = response.json() content = result["content"][0]["text"] if result.get("content") else "" endTime = time.time() processingTime = endTime - startTime # Calculate cost inputTokens = result.get("usage", {}).get("input_tokens", 0) outputTokens = result.get("usage", {}).get("output_tokens", 0) # Return standardized response return AiModelResponse( content=content, success=True, modelId=model.name, processingTime=processingTime ) except Exception as e: logger.error(f"Error during image analysis: {str(e)}", exc_info=True) return AiModelResponse( content="", success=False, error=f"Error during image analysis: {str(e)}" ) def _convertMessagesForAnthropic(messages: List[Dict[str, Any]]): """Convert OpenAI-style messages to Anthropic format. Returns (messages, system_prompt).""" system_contents: List[str] = [] converted_messages: List[Dict[str, Any]] = [] pendingToolResults: List[Dict[str, Any]] = [] def _flush(): if not pendingToolResults: return converted_messages.append({"role": "user", "content": list(pendingToolResults)}) pendingToolResults.clear() def _collapse(content): if isinstance(content, list): return "\n\n".join( (part.get("text") if isinstance(part, dict) else str(part)) for part in content ) return str(content) if content else "" for m in messages: role = m.get("role") content = m.get("content", "") if role == "system": system_contents.append(_collapse(content)) continue if role == "tool": pendingToolResults.append({ "type": "tool_result", "tool_use_id": m.get("tool_call_id", ""), "content": str(content) if content else "", }) continue _flush() if role == "assistant" and m.get("tool_calls"): contentBlocks = [] textPart = _collapse(content) if textPart: contentBlocks.append({"type": "text", "text": textPart}) for tc in m["tool_calls"]: fn = tc.get("function", {}) inputData = fn.get("arguments", "{}") if isinstance(inputData, str): try: inputData = json.loads(inputData) except (json.JSONDecodeError, ValueError): inputData = {} contentBlocks.append({ "type": "tool_use", "id": tc.get("id", ""), "name": fn.get("name", ""), "input": inputData, }) converted_messages.append({"role": "assistant", "content": contentBlocks}) continue converted_messages.append({"role": role, "content": _collapse(content)}) _flush() merged: List[Dict[str, Any]] = [] for msg in converted_messages: if merged and merged[-1]["role"] == msg["role"]: prev = merged[-1] pc, nc = prev["content"], msg["content"] if isinstance(pc, str) and isinstance(nc, str): prev["content"] = pc + "\n\n" + nc elif isinstance(pc, list) and isinstance(nc, list): prev["content"] = pc + nc elif isinstance(pc, str) and isinstance(nc, list): prev["content"] = [{"type": "text", "text": pc}] + nc elif isinstance(pc, list) and isinstance(nc, str): prev["content"] = pc + [{"type": "text", "text": nc}] else: merged.append(msg) system_prompt = "\n\n".join([s for s in system_contents if s]) if system_contents else None return merged, system_prompt def _convertToolsToAnthropicFormat(openaiTools: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Convert OpenAI-style tool definitions to Anthropic format.""" anthropicTools = [] for tool in openaiTools: if tool.get("type") == "function": fn = tool["function"] anthropicTools.append({ "name": fn["name"], "description": fn.get("description", ""), "input_schema": fn.get("parameters", {"type": "object", "properties": {}}) }) return anthropicTools