gateway/modules/interfaces/interfaceAiCalls.py
2025-09-02 21:11:32 +02:00

527 lines
20 KiB
Python

import logging
from typing import Dict, Any, List, Union, Optional
from modules.connectors.connectorAiOpenai import AiOpenai, ContextLengthExceededException
from modules.connectors.connectorAiAnthropic import AiAnthropic
from modules.chat.documents.documentExtraction import DocumentExtraction
from modules.interfaces.interfaceChatModel import ChatDocument
logger = logging.getLogger(__name__)
# AI Model Registry with Performance Data
AI_MODELS = {
"openai_gpt4o": {
"connector": "openai",
"max_tokens": 128000,
"cost_per_1k_tokens": 0.03, # Input
"cost_per_1k_tokens_output": 0.06, # Output
"speed_rating": 8, # 1-10
"quality_rating": 9, # 1-10
"supports_images": True,
"supports_documents": True,
"context_length": 128000,
"model_name": "gpt-4o"
},
"openai_gpt35": {
"connector": "openai",
"max_tokens": 16000,
"cost_per_1k_tokens": 0.0015,
"cost_per_1k_tokens_output": 0.002,
"speed_rating": 9,
"quality_rating": 7,
"supports_images": False,
"supports_documents": True,
"context_length": 16000,
"model_name": "gpt-3.5-turbo"
},
"anthropic_claude": {
"connector": "anthropic",
"max_tokens": 200000,
"cost_per_1k_tokens": 0.015,
"cost_per_1k_tokens_output": 0.075,
"speed_rating": 7,
"quality_rating": 10,
"supports_images": True,
"supports_documents": True,
"context_length": 200000,
"model_name": "claude-3-sonnet-20240229"
}
}
class AiCalls:
"""Interface for AI service interactions with centralized call method"""
def __init__(self):
self.openaiService = AiOpenai()
self.anthropicService = AiAnthropic()
self.document_extractor = DocumentExtraction()
async def callAi(
self,
prompt: str,
documents: List[ChatDocument] = None,
operation_type: str = "general",
priority: str = "balanced", # "speed", "quality", "cost", "balanced"
compress_prompt: bool = True,
compress_documents: bool = True,
process_documents_individually: bool = False,
max_cost: float = None,
max_processing_time: int = None
) -> str:
"""
Zentrale AI Call Methode mit intelligenter Modell-Auswahl und Content-Verarbeitung.
Args:
prompt: Der Hauptprompt für die AI
documents: Liste von Dokumenten zur Verarbeitung
operation_type: Art der Operation ("general", "document_analysis", "image_analysis", etc.)
priority: Priorität für Modell-Auswahl ("speed", "quality", "cost", "balanced")
compress_prompt: Ob der Prompt komprimiert werden soll
compress_documents: Ob Dokumente komprimiert werden sollen
process_documents_individually: Ob Dokumente einzeln verarbeitet werden sollen
max_cost: Maximale Kosten für den Call
max_processing_time: Maximale Verarbeitungszeit in Sekunden
Returns:
AI Response als String
"""
try:
# 1. Dokumente verarbeiten falls vorhanden
document_content = ""
if documents:
document_content = await self._process_documents_for_ai(
documents,
operation_type,
compress_documents,
process_documents_individually
)
# 2. Bestes Modell basierend auf Priorität und Content auswählen
selected_model = self._select_optimal_model(
prompt,
document_content,
priority,
operation_type,
max_cost,
max_processing_time
)
# 3. Content für das gewählte Modell optimieren
optimized_prompt, optimized_content = await self._optimize_content_for_model(
prompt,
document_content,
selected_model,
compress_prompt,
compress_documents
)
# 4. AI Call mit Failover ausführen
return await self._execute_ai_call_with_failover(
selected_model,
optimized_prompt,
optimized_content
)
except Exception as e:
logger.error(f"Error in centralized AI call: {str(e)}")
return f"Error: {str(e)}"
def _select_optimal_model(
self,
prompt: str,
document_content: str,
priority: str,
operation_type: str,
max_cost: float = None,
max_processing_time: int = None
) -> str:
"""Wählt das optimale Modell basierend auf Priorität und Content aus"""
# Content-Größe berechnen
total_content_size = len(prompt.encode('utf-8')) + len(document_content.encode('utf-8'))
# Verfügbare Modelle filtern
available_models = {}
for model_name, model_info in AI_MODELS.items():
# Prüfe ob Modell für Content-Größe geeignet ist
if total_content_size > model_info["context_length"] * 0.8: # 80% für Content
continue
# Prüfe Kosten-Limit
if max_cost:
estimated_cost = self._estimate_cost(model_info, total_content_size)
if estimated_cost > max_cost:
continue
# Prüfe Operation-Type Kompatibilität
if operation_type == "image_analysis" and not model_info["supports_images"]:
continue
available_models[model_name] = model_info
if not available_models:
# Fallback zum kleinsten Modell
return "openai_gpt35"
# Modell basierend auf Priorität auswählen
if priority == "speed":
return max(available_models.keys(), key=lambda x: available_models[x]["speed_rating"])
elif priority == "quality":
return max(available_models.keys(), key=lambda x: available_models[x]["quality_rating"])
elif priority == "cost":
return min(available_models.keys(), key=lambda x: available_models[x]["cost_per_1k_tokens"])
else: # balanced
# Gewichtete Bewertung: 40% Qualität, 30% Geschwindigkeit, 30% Kosten
def balanced_score(model_name):
model_info = available_models[model_name]
quality_score = model_info["quality_rating"] * 0.4
speed_score = model_info["speed_rating"] * 0.3
cost_score = (10 - (model_info["cost_per_1k_tokens"] * 1000)) * 0.3 # Niedrigere Kosten = höherer Score
return quality_score + speed_score + cost_score
return max(available_models.keys(), key=balanced_score)
def _estimate_cost(self, model_info: Dict, content_size: int) -> float:
"""Schätzt die Kosten für einen AI Call"""
# Grobe Schätzung: 1 Token ≈ 4 Zeichen
estimated_tokens = content_size / 4
input_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens"]
output_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens_output"] * 0.1 # 10% für Output
return input_cost + output_cost
async def _process_documents_for_ai(
self,
documents: List[ChatDocument],
operation_type: str,
compress_documents: bool,
process_individually: bool
) -> str:
"""Verarbeitet Dokumente für AI Call mit documentExtraction.py"""
if not documents:
return ""
processed_contents = []
for doc in documents:
try:
# Extrahiere Content mit documentExtraction.py
extracted = await self.document_extractor.processFileData(
doc.fileData,
doc.fileName,
doc.mimeType,
prompt=f"Extract relevant content for {operation_type}",
documentId=doc.id,
enableAI=True
)
# Kombiniere alle Content-Items
doc_content = []
for content_item in extracted.contents:
if content_item.data and content_item.data.strip():
doc_content.append(content_item.data)
if doc_content:
combined_doc_content = "\n\n".join(doc_content)
# Komprimiere falls gewünscht
if compress_documents and len(combined_doc_content.encode('utf-8')) > 10000: # 10KB Limit
combined_doc_content = await self._compress_content(
combined_doc_content,
10000,
"document"
)
processed_contents.append(f"Document: {doc.fileName}\n{combined_doc_content}")
except Exception as e:
logger.warning(f"Error processing document {doc.fileName}: {str(e)}")
processed_contents.append(f"Document: {doc.fileName}\n[Error processing document: {str(e)}]")
return "\n\n---\n\n".join(processed_contents)
async def _optimize_content_for_model(
self,
prompt: str,
document_content: str,
model_name: str,
compress_prompt: bool,
compress_documents: bool
) -> tuple[str, str]:
"""Optimiert Content für das gewählte Modell"""
model_info = AI_MODELS[model_name]
max_content_size = model_info["context_length"] * 0.7 # 70% für Content
optimized_prompt = prompt
optimized_content = document_content
# Prompt komprimieren falls gewünscht
if compress_prompt and len(prompt.encode('utf-8')) > 2000: # 2KB Limit für Prompt
optimized_prompt = await self._compress_content(prompt, 2000, "prompt")
# Dokument-Content komprimieren falls gewünscht
if compress_documents and document_content:
content_size = len(document_content.encode('utf-8'))
if content_size > max_content_size:
optimized_content = await self._compress_content(
document_content,
int(max_content_size),
"document"
)
return optimized_prompt, optimized_content
async def _compress_content(self, content: str, target_size: int, content_type: str) -> str:
"""Komprimiert Content intelligent basierend auf Typ"""
if len(content.encode('utf-8')) <= target_size:
return content
try:
# Verwende AI für intelligente Kompression
compression_prompt = f"""
Komprimiere den folgenden {content_type} auf maximal {target_size} Zeichen,
behalte aber alle wichtigen Informationen bei:
{content}
Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
"""
# Verwende das schnellste verfügbare Modell für Kompression
compression_model = "openai_gpt35"
model_info = AI_MODELS[compression_model]
connector = getattr(self, f"{model_info['connector']}Service")
messages = [{"role": "user", "content": compression_prompt}]
if model_info["connector"] == "openai":
compressed = await connector.callAiBasic(messages)
else:
response = await connector.callAiBasic(messages)
compressed = response["choices"][0]["message"]["content"]
return compressed
except Exception as e:
logger.warning(f"AI compression failed, using truncation: {str(e)}")
# Fallback: Einfache Truncation
return content[:target_size] + "... [truncated]"
async def _execute_ai_call_with_failover(
self,
model_name: str,
prompt: str,
document_content: str
) -> str:
"""Führt AI Call mit automatischem Failover aus"""
try:
model_info = AI_MODELS[model_name]
connector = getattr(self, f"{model_info['connector']}Service")
# Messages vorbereiten
messages = []
if document_content:
messages.append({
"role": "system",
"content": f"Context from documents:\n{document_content}"
})
messages.append({
"role": "user",
"content": prompt
})
# AI Call ausführen
if model_info["connector"] == "openai":
return await connector.callAiBasic(messages)
else: # anthropic
response = await connector.callAiBasic(messages)
return response["choices"][0]["message"]["content"]
except ContextLengthExceededException:
logger.warning(f"Context length exceeded for {model_name}, trying fallback")
# Fallback zu Modell mit größerem Context
fallback_model = self._find_fallback_model(model_name)
if fallback_model:
return await self._execute_ai_call_with_failover(fallback_model, prompt, document_content)
else:
# Letzter Ausweg: Content weiter komprimieren
compressed_prompt = await self._compress_content(prompt, 1000, "prompt")
compressed_content = await self._compress_content(document_content, 5000, "document")
return await self._execute_ai_call_with_failover("openai_gpt35", compressed_prompt, compressed_content)
except Exception as e:
logger.warning(f"AI call failed with {model_name}: {e}")
# Allgemeiner Fallback
return await self._execute_ai_call_with_failover("openai_gpt35", prompt, document_content)
def _find_fallback_model(self, current_model: str) -> Optional[str]:
"""Findet ein Fallback-Modell mit größerem Context"""
current_context = AI_MODELS[current_model]["context_length"]
# Suche Modell mit größerem Context
for model_name, model_info in AI_MODELS.items():
if model_info["context_length"] > current_context:
return model_name
return None
# Legacy methods
async def callAiTextBasic(self, prompt: str, context: Optional[str] = None) -> str:
"""
Basic text processing - now uses centralized AI call method.
Args:
prompt: The user prompt to process
context: Optional system context/prompt
Returns:
The AI response as text
"""
# Combine context with prompt if provided
full_prompt = prompt
if context:
full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
# Use centralized AI call with speed priority for basic calls
return await self.callAi(
prompt=full_prompt,
priority="speed",
compress_prompt=True,
compress_documents=False
)
async def callAiTextAdvanced(self, prompt: str, context: Optional[str] = None, _is_fallback: bool = False) -> str:
"""
Advanced text processing - now uses centralized AI call method.
Args:
prompt: The user prompt to process
context: Optional system context/prompt
_is_fallback: Internal flag (kept for compatibility)
Returns:
The AI response as text
"""
# Combine context with prompt if provided
full_prompt = prompt
if context:
full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
# Use centralized AI call with quality priority for advanced calls
return await self.callAi(
prompt=full_prompt,
priority="quality",
compress_prompt=False,
compress_documents=False
)
async def callAiImageBasic(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
"""
Basic image processing - now uses centralized AI call method.
Args:
prompt: The prompt for image analysis
imageData: The image data (file path or bytes)
mimeType: Optional MIME type of the image
Returns:
The AI response as text
"""
try:
# For image processing, use the original connector directly
# as the centralized method doesn't handle images yet
return await self.openaiService.callAiImage(prompt, imageData, mimeType)
except Exception as e:
logger.error(f"Error in OpenAI image call: {str(e)}")
return f"Error: {str(e)}"
async def callAiImageAdvanced(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
"""
Advanced image processing - now uses centralized AI call method.
Args:
prompt: The prompt for image analysis
imageData: The image data (file path or bytes)
mimeType: Optional MIME type of the image
Returns:
The AI response as text
"""
try:
# For image processing, use the original connector directly
# as the centralized method doesn't handle images yet
return await self.anthropicService.callAiImage(prompt, imageData, mimeType)
except Exception as e:
logger.error(f"Error in Anthropic image call: {str(e)}")
return f"Error: {str(e)}"
# Convenience methods for common use cases
async def callAiForDocumentAnalysis(
self,
prompt: str,
documents: List[ChatDocument],
priority: str = "balanced"
) -> str:
"""Convenience method for document analysis"""
return await self.callAi(
prompt=prompt,
documents=documents,
operation_type="document_analysis",
priority=priority,
compress_documents=True,
process_documents_individually=False
)
async def callAiForReportGeneration(
self,
prompt: str,
documents: List[ChatDocument],
priority: str = "quality"
) -> str:
"""Convenience method for report generation"""
return await self.callAi(
prompt=prompt,
documents=documents,
operation_type="report_generation",
priority=priority,
compress_documents=True,
process_documents_individually=True
)
async def callAiForEmailComposition(
self,
prompt: str,
documents: List[ChatDocument] = None,
priority: str = "speed"
) -> str:
"""Convenience method for email composition"""
return await self.callAi(
prompt=prompt,
documents=documents,
operation_type="email_composition",
priority=priority,
compress_prompt=True,
compress_documents=True
)
async def callAiForTaskPlanning(
self,
prompt: str,
documents: List[ChatDocument] = None,
priority: str = "balanced"
) -> str:
"""Convenience method for task planning"""
return await self.callAi(
prompt=prompt,
documents=documents,
operation_type="task_planning",
priority=priority,
compress_prompt=False,
compress_documents=True
)