gateway/modules/aicore/aicorePluginOpenai.py
ValueOn AG 259fd25d9b fixes
2026-04-09 21:33:56 +02:00

591 lines
No EOL
26 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import logging
import json as _json
import httpx
from typing import List, Dict, Any, AsyncGenerator, Union
from fastapi import HTTPException
from modules.shared.configuration import APP_CONFIG
from .aicoreBase import BaseConnectorAi, RateLimitExceededException, ContextLengthExceededException
from modules.datamodels.datamodelAi import AiModel, PriorityEnum, ProcessingModeEnum, OperationTypeEnum, AiModelCall, AiModelResponse, createOperationTypeRatings, AiCallPromptImage
logger = logging.getLogger(__name__)
def loadConfigData():
"""Load configuration data for OpenAI connector"""
return {
"apiKey": APP_CONFIG.get('Connector_AiOpenai_API_SECRET'),
}
class AiOpenai(BaseConnectorAi):
"""Connector for communication with the OpenAI API."""
def __init__(self):
super().__init__()
# Load configuration
self.config = loadConfigData()
self.apiKey = self.config["apiKey"]
# HttpClient for API calls
# Timeout set to 600 seconds (10 minutes) for complex requests that may take longer
# AiService calls can take significantly longer due to prompt building and processing overhead
self.httpClient = httpx.AsyncClient(
timeout=600.0,
headers={
"Authorization": f"Bearer {self.apiKey}",
"Content-Type": "application/json"
}
)
logger.info("OpenAI Connector initialized")
def getConnectorType(self) -> str:
"""Get the connector type identifier."""
return "openai"
def getModels(self) -> List[AiModel]:
"""Get all available OpenAI models."""
return [
AiModel(
name="gpt-4o",
displayName="OpenAI GPT-4o",
connectorType="openai",
apiUrl="https://api.openai.com/v1/chat/completions",
temperature=0.2,
maxTokens=16384,
contextLength=128000,
maxInputTokensPerRequest=25000, # OpenAI org TPM limit is 30K, keep 5K buffer
costPer1kTokensInput=0.0025, # $2.50/M tokens (updated 2026-02)
costPer1kTokensOutput=0.01, # $10.00/M tokens (updated 2026-02)
speedRating=8, # Good speed for complex tasks
qualityRating=10, # High quality
functionCall=self.callAiBasic,
functionCallStream=self.callAiBasicStream,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.ADVANCED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.PLAN, 9),
(OperationTypeEnum.DATA_ANALYSE, 10),
(OperationTypeEnum.DATA_GENERATE, 10),
(OperationTypeEnum.DATA_EXTRACT, 7),
(OperationTypeEnum.AGENT, 9),
(OperationTypeEnum.DATA_QUERY, 8),
),
version="gpt-4o",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.0025 + (bytesReceived / 4 / 1000) * 0.01
),
AiModel(
name="gpt-4o-mini",
displayName="OpenAI GPT-4o Mini",
connectorType="openai",
apiUrl="https://api.openai.com/v1/chat/completions",
temperature=0.2,
maxTokens=16384,
contextLength=128000,
maxInputTokensPerRequest=25000, # OpenAI org TPM limit, keep buffer
costPer1kTokensInput=0.00015, # $0.15/M tokens (updated 2026-02)
costPer1kTokensOutput=0.0006, # $0.60/M tokens (updated 2026-02)
speedRating=9, # Very fast
qualityRating=8, # Good quality, replaces gpt-3.5-turbo
functionCall=self.callAiBasic,
functionCallStream=self.callAiBasicStream,
priority=PriorityEnum.SPEED,
processingMode=ProcessingModeEnum.BASIC,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.PLAN, 8),
(OperationTypeEnum.DATA_ANALYSE, 8),
(OperationTypeEnum.DATA_GENERATE, 9),
(OperationTypeEnum.DATA_EXTRACT, 7),
(OperationTypeEnum.AGENT, 8),
(OperationTypeEnum.DATA_QUERY, 10),
),
version="gpt-4o-mini",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.00015 + (bytesReceived / 4 / 1000) * 0.0006
),
AiModel(
name="gpt-4o",
displayName="OpenAI GPT-4o Vision",
connectorType="openai",
apiUrl="https://api.openai.com/v1/chat/completions",
temperature=0.2,
maxTokens=16384,
contextLength=128000,
maxInputTokensPerRequest=25000, # OpenAI org TPM limit is 30K, keep 5K buffer
costPer1kTokensInput=0.0025, # $2.50/M tokens (updated 2026-02)
costPer1kTokensOutput=0.01, # $10.00/M tokens (updated 2026-02)
speedRating=6, # Slower for vision tasks
qualityRating=9, # High quality vision
functionCall=self.callAiImage,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.IMAGE_ANALYSE, 9)
),
version="gpt-4o",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.0025 + (bytesReceived / 4 / 1000) * 0.01
),
AiModel(
name="text-embedding-3-small",
displayName="OpenAI Embedding Small",
connectorType="openai",
apiUrl="https://api.openai.com/v1/embeddings",
temperature=0.0,
maxTokens=0,
contextLength=8191,
costPer1kTokensInput=0.00002, # $0.02/M tokens
costPer1kTokensOutput=0.0,
speedRating=10,
qualityRating=8,
functionCall=self.callEmbedding,
priority=PriorityEnum.COST,
processingMode=ProcessingModeEnum.BASIC,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.EMBEDDING, 10)
),
version="text-embedding-3-small",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.00002
),
AiModel(
name="text-embedding-3-large",
displayName="OpenAI Embedding Large",
connectorType="openai",
apiUrl="https://api.openai.com/v1/embeddings",
temperature=0.0,
maxTokens=0,
contextLength=8191,
costPer1kTokensInput=0.00013, # $0.13/M tokens
costPer1kTokensOutput=0.0,
speedRating=9,
qualityRating=10,
functionCall=self.callEmbedding,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.ADVANCED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.EMBEDDING, 10)
),
version="text-embedding-3-large",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.00013
),
AiModel(
name="dall-e-3",
displayName="OpenAI DALL-E 3",
connectorType="openai",
apiUrl="https://api.openai.com/v1/images/generations",
temperature=0.0, # Image generation doesn't use temperature
maxTokens=0, # Image generation doesn't use tokens
contextLength=0,
costPer1kTokensInput=0.04,
costPer1kTokensOutput=0.0,
speedRating=5, # Slow for image generation
qualityRating=9, # High quality art generation
# capabilities removed (not used in business logic)
functionCall=self.generateImage,
priority=PriorityEnum.QUALITY,
processingMode=ProcessingModeEnum.DETAILED,
operationTypes=createOperationTypeRatings(
(OperationTypeEnum.IMAGE_GENERATE, 10)
),
version="dall-e-3",
calculatepriceCHF=lambda processingTime, bytesSent, bytesReceived: (bytesSent / 4 / 1000) * 0.04
)
]
async def callAiBasic(self, modelCall: AiModelCall) -> AiModelResponse:
"""
Calls the OpenAI API with the given messages using standardized pattern.
Args:
modelCall: AiModelCall with messages and options
Returns:
AiModelResponse with content and metadata
Raises:
HTTPException: For errors in API communication
"""
try:
# Extract parameters from modelCall
messages = modelCall.messages
model = modelCall.model
options = modelCall.options
temperature = getattr(options, "temperature", None)
if temperature is None:
temperature = model.temperature
maxTokens = model.maxTokens
payload = {
"model": model.name,
"messages": messages,
"temperature": temperature,
"max_tokens": maxTokens
}
if modelCall.tools:
payload["tools"] = modelCall.tools
payload["tool_choice"] = modelCall.toolChoice or "auto"
response = await self.httpClient.post(
model.apiUrl,
json=payload
)
if response.status_code != 200:
error_message = f"OpenAI API error: {response.status_code} - {response.text}"
logger.error(error_message)
# Check for rate limit exceeded (429 TPM)
if response.status_code == 429:
try:
error_data = response.json()
error_msg = error_data.get("error", {}).get("message", "Rate limit exceeded")
raise RateLimitExceededException(
f"Rate limit exceeded for {model.name}: {error_msg}"
)
except (ValueError, KeyError):
raise RateLimitExceededException(
f"Rate limit exceeded for {model.name}"
)
# Check for context length exceeded error
if response.status_code == 400:
try:
error_data = response.json()
if (error_data.get("error", {}).get("code") == "context_length_exceeded" or
"context length" in error_data.get("error", {}).get("message", "").lower()):
# Raise a specific exception for context length issues
raise ContextLengthExceededException(
f"Context length exceeded: {error_data.get('error', {}).get('message', 'Unknown error')}"
)
except (ValueError, KeyError):
pass # If we can't parse the error, fall through to generic error
# Include the actual error details in the exception
raise HTTPException(status_code=500, detail=error_message)
responseJson = response.json()
choiceMessage = responseJson["choices"][0]["message"]
content = choiceMessage.get("content") or ""
metadata = {"response_id": responseJson.get("id", "")}
if choiceMessage.get("tool_calls"):
metadata["toolCalls"] = choiceMessage["tool_calls"]
return AiModelResponse(
content=content,
success=True,
modelId=model.name,
metadata=metadata
)
except ContextLengthExceededException:
raise
except Exception as e:
logger.error(f"Error calling OpenAI API: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error calling OpenAI API: {str(e)}")
async def callAiBasicStream(self, modelCall: AiModelCall) -> AsyncGenerator[Union[str, AiModelResponse], None]:
"""Stream OpenAI response. Yields str deltas, then final AiModelResponse."""
try:
messages = modelCall.messages
model = modelCall.model
options = modelCall.options
temperature = getattr(options, "temperature", None)
if temperature is None:
temperature = model.temperature
payload: Dict[str, Any] = {
"model": model.name,
"messages": messages,
"temperature": temperature,
"max_tokens": model.maxTokens,
"stream": True,
}
if modelCall.tools:
payload["tools"] = modelCall.tools
payload["tool_choice"] = modelCall.toolChoice or "auto"
fullContent = ""
toolCallsAccum: Dict[int, Dict[str, Any]] = {}
async with self.httpClient.stream("POST", model.apiUrl, json=payload) as response:
if response.status_code != 200:
body = await response.aread()
bodyStr = body.decode()
if response.status_code == 429:
try:
errorMsg = _json.loads(bodyStr).get("error", {}).get("message", "Rate limit exceeded")
except (ValueError, KeyError):
errorMsg = f"Rate limit exceeded for {model.name}"
raise RateLimitExceededException(f"Rate limit exceeded for {model.name}: {errorMsg}")
raise HTTPException(status_code=500, detail=f"OpenAI stream error: {response.status_code} - {bodyStr}")
async for line in response.aiter_lines():
if not line.startswith("data: "):
continue
data = line[6:]
if data.strip() == "[DONE]":
break
try:
chunk = _json.loads(data)
except _json.JSONDecodeError:
continue
delta = chunk.get("choices", [{}])[0].get("delta", {})
if "content" in delta and delta["content"]:
fullContent += delta["content"]
yield delta["content"]
for tcDelta in delta.get("tool_calls", []):
idx = tcDelta.get("index", 0)
if idx not in toolCallsAccum:
toolCallsAccum[idx] = {
"id": tcDelta.get("id", ""),
"type": "function",
"function": {"name": "", "arguments": ""},
}
if tcDelta.get("id"):
toolCallsAccum[idx]["id"] = tcDelta["id"]
fn = tcDelta.get("function", {})
if fn.get("name"):
toolCallsAccum[idx]["function"]["name"] = fn["name"]
if fn.get("arguments"):
toolCallsAccum[idx]["function"]["arguments"] += fn["arguments"]
metadata: Dict[str, Any] = {}
if toolCallsAccum:
metadata["toolCalls"] = [toolCallsAccum[i] for i in sorted(toolCallsAccum)]
yield AiModelResponse(
content=fullContent,
success=True,
modelId=model.name,
metadata=metadata,
)
except (RateLimitExceededException, ContextLengthExceededException, HTTPException):
raise
except Exception as e:
logger.error(f"Error streaming OpenAI API: {e}")
raise HTTPException(status_code=500, detail=f"Error streaming OpenAI API: {e}")
async def callEmbedding(self, modelCall: AiModelCall) -> AiModelResponse:
"""Generate embeddings via the OpenAI Embeddings API.
Reads texts from modelCall.embeddingInput.
Returns vectors in metadata["embeddings"].
"""
try:
model = modelCall.model
texts = modelCall.embeddingInput or []
if not texts:
return AiModelResponse(
content="", success=False, error="No embeddingInput provided"
)
payload = {"model": model.name, "input": texts}
response = await self.httpClient.post(model.apiUrl, json=payload)
if response.status_code != 200:
errorMessage = f"OpenAI Embedding API error: {response.status_code} - {response.text}"
logger.error(errorMessage)
if response.status_code == 429:
raise RateLimitExceededException(f"Rate limit exceeded for {model.name}")
if response.status_code == 400:
try:
errorData = response.json()
errMsg = errorData.get("error", {}).get("message", "").lower()
errCode = errorData.get("error", {}).get("code", "")
if errCode == "context_length_exceeded" or "too many tokens" in errMsg or "maximum context length" in errMsg:
raise ContextLengthExceededException(
f"Embedding context length exceeded for {model.name}: {errorData.get('error', {}).get('message', '')}"
)
except (ValueError, KeyError):
pass
raise HTTPException(status_code=500, detail=errorMessage)
responseJson = response.json()
embeddings = [item["embedding"] for item in responseJson["data"]]
usage = responseJson.get("usage", {})
return AiModelResponse(
content="",
success=True,
modelId=model.name,
tokensUsed={
"input": usage.get("prompt_tokens", 0),
"output": 0,
"total": usage.get("total_tokens", 0),
},
metadata={"embeddings": embeddings},
)
except (RateLimitExceededException, ContextLengthExceededException):
raise
except Exception as e:
logger.error(f"Error calling OpenAI Embedding API: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error calling OpenAI Embedding API: {str(e)}")
async def callAiImage(self, modelCall: AiModelCall) -> AiModelResponse:
"""
Analyzes an image with the OpenAI Vision API using standardized pattern.
Args:
modelCall: AiModelCall with messages and image data in options
Returns:
AiModelResponse with analysis content
"""
try:
# Extract parameters from modelCall
messages = modelCall.messages
model = modelCall.model
# Messages should already be in the correct format with image data embedded
# Just verify they contain image data
if not messages or not messages[0].get("content"):
raise ValueError("No messages provided for image analysis")
logger.debug(f"Starting image analysis with {len(messages)} message(s)...")
# Use the messages directly - they should already contain the image data
# in the format: {"type": "image_url", "image_url": {"url": "data:...base64,..."}}
# Use parameters from model
temperature = model.temperature
# Don't set maxTokens - let the model use its full context length
payload = {
"model": model.name,
"messages": messages,
"temperature": temperature
}
response = await self.httpClient.post(
model.apiUrl,
json=payload
)
if response.status_code != 200:
logger.error(f"OpenAI API error: {response.status_code} - {response.text}")
raise HTTPException(status_code=500, detail="Error communicating with OpenAI API")
responseJson = response.json()
content = responseJson["choices"][0]["message"]["content"]
return AiModelResponse(
content=content,
success=True,
modelId=model.name,
metadata={"response_id": responseJson.get("id", "")}
)
except Exception as e:
logger.error(f"Error during image analysis: {str(e)}", exc_info=True)
return AiModelResponse(
content="",
success=False,
error=f"Error during image analysis: {str(e)}"
)
async def generateImage(self, modelCall: AiModelCall) -> AiModelResponse:
"""
Generate an image using DALL-E 3 using standardized pattern.
Args:
modelCall: AiModelCall with messages and generation options
Returns:
AiModelResponse with generated image data
"""
try:
# Extract parameters from modelCall
messages = modelCall.messages
model = modelCall.model
options = modelCall.options
# Get prompt from messages
promptContent = messages[0]["content"] if messages else ""
# Parse prompt using AiCallPromptImage model
import json
try:
# Try to parse as JSON
promptData = json.loads(promptContent)
promptModel = AiCallPromptImage(**promptData)
except:
# If not JSON, use plain text prompt
promptModel = AiCallPromptImage(
prompt=promptContent,
size=options.size if options and hasattr(options, 'size') else "1024x1024",
quality=options.quality if options and hasattr(options, 'quality') else "standard",
style=options.style if options and hasattr(options, 'style') else "vivid"
)
# Extract parameters from Pydantic model
prompt = promptModel.prompt
size = promptModel.size or "1024x1024"
quality = promptModel.quality or "standard"
style = promptModel.style or "vivid"
logger.debug(f"Starting image generation with prompt: '{prompt[:100]}...'")
# DALL-E 3 API endpoint
dalle_url = "https://api.openai.com/v1/images/generations"
payload = {
"model": "dall-e-3",
"prompt": prompt,
"size": size,
"quality": quality,
"style": style,
"n": 1,
"response_format": "b64_json" # Get base64 data directly instead of URLs
}
# Use existing httpClient to benefit from connection pooling
# This avoids TLS connection issues that can occur with fresh clients
response = await self.httpClient.post(
dalle_url,
json=payload
)
if response.status_code != 200:
logger.error(f"DALL-E API error: {response.status_code} - {response.text}")
return AiModelResponse(
content="",
success=False,
error=f"DALL-E API error: {response.status_code} - {response.text}"
)
responseJson = response.json()
if "data" in responseJson and len(responseJson["data"]) > 0:
image_data = responseJson["data"][0]["b64_json"]
logger.info(f"Successfully generated image: {len(image_data)} characters")
return AiModelResponse(
content=image_data,
success=True,
modelId="dall-e-3",
metadata={
"size": size,
"quality": quality,
"style": style,
"response_id": responseJson.get("id", "")
}
)
else:
logger.error("No image data in DALL-E response")
return AiModelResponse(
content="",
success=False,
error="No image data in DALL-E response"
)
except Exception as e:
logger.error(f"Error during image generation: {str(e)}", exc_info=True)
return AiModelResponse(
content="",
success=False,
error=f"Error during image generation: {str(e)}"
)