527 lines
20 KiB
Python
527 lines
20 KiB
Python
import logging
|
|
from typing import Dict, Any, List, Union, Optional
|
|
from modules.connectors.connectorAiOpenai import AiOpenai, ContextLengthExceededException
|
|
from modules.connectors.connectorAiAnthropic import AiAnthropic
|
|
from modules.chat.documents.documentExtraction import DocumentExtraction
|
|
from modules.interfaces.interfaceChatModel import ChatDocument
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# AI Model Registry with Performance Data
|
|
AI_MODELS = {
|
|
"openai_gpt4o": {
|
|
"connector": "openai",
|
|
"max_tokens": 128000,
|
|
"cost_per_1k_tokens": 0.03, # Input
|
|
"cost_per_1k_tokens_output": 0.06, # Output
|
|
"speed_rating": 8, # 1-10
|
|
"quality_rating": 9, # 1-10
|
|
"supports_images": True,
|
|
"supports_documents": True,
|
|
"context_length": 128000,
|
|
"model_name": "gpt-4o"
|
|
},
|
|
"openai_gpt35": {
|
|
"connector": "openai",
|
|
"max_tokens": 16000,
|
|
"cost_per_1k_tokens": 0.0015,
|
|
"cost_per_1k_tokens_output": 0.002,
|
|
"speed_rating": 9,
|
|
"quality_rating": 7,
|
|
"supports_images": False,
|
|
"supports_documents": True,
|
|
"context_length": 16000,
|
|
"model_name": "gpt-3.5-turbo"
|
|
},
|
|
"anthropic_claude": {
|
|
"connector": "anthropic",
|
|
"max_tokens": 200000,
|
|
"cost_per_1k_tokens": 0.015,
|
|
"cost_per_1k_tokens_output": 0.075,
|
|
"speed_rating": 7,
|
|
"quality_rating": 10,
|
|
"supports_images": True,
|
|
"supports_documents": True,
|
|
"context_length": 200000,
|
|
"model_name": "claude-3-sonnet-20240229"
|
|
}
|
|
}
|
|
|
|
class AiCalls:
|
|
"""Interface for AI service interactions with centralized call method"""
|
|
|
|
def __init__(self):
|
|
self.openaiService = AiOpenai()
|
|
self.anthropicService = AiAnthropic()
|
|
self.document_extractor = DocumentExtraction()
|
|
|
|
async def callAi(
|
|
self,
|
|
prompt: str,
|
|
documents: List[ChatDocument] = None,
|
|
operation_type: str = "general",
|
|
priority: str = "balanced", # "speed", "quality", "cost", "balanced"
|
|
compress_prompt: bool = True,
|
|
compress_documents: bool = True,
|
|
process_documents_individually: bool = False,
|
|
max_cost: float = None,
|
|
max_processing_time: int = None
|
|
) -> str:
|
|
"""
|
|
Zentrale AI Call Methode mit intelligenter Modell-Auswahl und Content-Verarbeitung.
|
|
|
|
Args:
|
|
prompt: Der Hauptprompt für die AI
|
|
documents: Liste von Dokumenten zur Verarbeitung
|
|
operation_type: Art der Operation ("general", "document_analysis", "image_analysis", etc.)
|
|
priority: Priorität für Modell-Auswahl ("speed", "quality", "cost", "balanced")
|
|
compress_prompt: Ob der Prompt komprimiert werden soll
|
|
compress_documents: Ob Dokumente komprimiert werden sollen
|
|
process_documents_individually: Ob Dokumente einzeln verarbeitet werden sollen
|
|
max_cost: Maximale Kosten für den Call
|
|
max_processing_time: Maximale Verarbeitungszeit in Sekunden
|
|
|
|
Returns:
|
|
AI Response als String
|
|
"""
|
|
try:
|
|
# 1. Dokumente verarbeiten falls vorhanden
|
|
document_content = ""
|
|
if documents:
|
|
document_content = await self._process_documents_for_ai(
|
|
documents,
|
|
operation_type,
|
|
compress_documents,
|
|
process_documents_individually
|
|
)
|
|
|
|
# 2. Bestes Modell basierend auf Priorität und Content auswählen
|
|
selected_model = self._select_optimal_model(
|
|
prompt,
|
|
document_content,
|
|
priority,
|
|
operation_type,
|
|
max_cost,
|
|
max_processing_time
|
|
)
|
|
|
|
# 3. Content für das gewählte Modell optimieren
|
|
optimized_prompt, optimized_content = await self._optimize_content_for_model(
|
|
prompt,
|
|
document_content,
|
|
selected_model,
|
|
compress_prompt,
|
|
compress_documents
|
|
)
|
|
|
|
# 4. AI Call mit Failover ausführen
|
|
return await self._execute_ai_call_with_failover(
|
|
selected_model,
|
|
optimized_prompt,
|
|
optimized_content
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in centralized AI call: {str(e)}")
|
|
return f"Error: {str(e)}"
|
|
|
|
def _select_optimal_model(
|
|
self,
|
|
prompt: str,
|
|
document_content: str,
|
|
priority: str,
|
|
operation_type: str,
|
|
max_cost: float = None,
|
|
max_processing_time: int = None
|
|
) -> str:
|
|
"""Wählt das optimale Modell basierend auf Priorität und Content aus"""
|
|
|
|
# Content-Größe berechnen
|
|
total_content_size = len(prompt.encode('utf-8')) + len(document_content.encode('utf-8'))
|
|
|
|
# Verfügbare Modelle filtern
|
|
available_models = {}
|
|
for model_name, model_info in AI_MODELS.items():
|
|
# Prüfe ob Modell für Content-Größe geeignet ist
|
|
if total_content_size > model_info["context_length"] * 0.8: # 80% für Content
|
|
continue
|
|
|
|
# Prüfe Kosten-Limit
|
|
if max_cost:
|
|
estimated_cost = self._estimate_cost(model_info, total_content_size)
|
|
if estimated_cost > max_cost:
|
|
continue
|
|
|
|
# Prüfe Operation-Type Kompatibilität
|
|
if operation_type == "image_analysis" and not model_info["supports_images"]:
|
|
continue
|
|
|
|
available_models[model_name] = model_info
|
|
|
|
if not available_models:
|
|
# Fallback zum kleinsten Modell
|
|
return "openai_gpt35"
|
|
|
|
# Modell basierend auf Priorität auswählen
|
|
if priority == "speed":
|
|
return max(available_models.keys(), key=lambda x: available_models[x]["speed_rating"])
|
|
elif priority == "quality":
|
|
return max(available_models.keys(), key=lambda x: available_models[x]["quality_rating"])
|
|
elif priority == "cost":
|
|
return min(available_models.keys(), key=lambda x: available_models[x]["cost_per_1k_tokens"])
|
|
else: # balanced
|
|
# Gewichtete Bewertung: 40% Qualität, 30% Geschwindigkeit, 30% Kosten
|
|
def balanced_score(model_name):
|
|
model_info = available_models[model_name]
|
|
quality_score = model_info["quality_rating"] * 0.4
|
|
speed_score = model_info["speed_rating"] * 0.3
|
|
cost_score = (10 - (model_info["cost_per_1k_tokens"] * 1000)) * 0.3 # Niedrigere Kosten = höherer Score
|
|
return quality_score + speed_score + cost_score
|
|
|
|
return max(available_models.keys(), key=balanced_score)
|
|
|
|
def _estimate_cost(self, model_info: Dict, content_size: int) -> float:
|
|
"""Schätzt die Kosten für einen AI Call"""
|
|
# Grobe Schätzung: 1 Token ≈ 4 Zeichen
|
|
estimated_tokens = content_size / 4
|
|
input_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens"]
|
|
output_cost = (estimated_tokens / 1000) * model_info["cost_per_1k_tokens_output"] * 0.1 # 10% für Output
|
|
return input_cost + output_cost
|
|
|
|
async def _process_documents_for_ai(
|
|
self,
|
|
documents: List[ChatDocument],
|
|
operation_type: str,
|
|
compress_documents: bool,
|
|
process_individually: bool
|
|
) -> str:
|
|
"""Verarbeitet Dokumente für AI Call mit documentExtraction.py"""
|
|
|
|
if not documents:
|
|
return ""
|
|
|
|
processed_contents = []
|
|
|
|
for doc in documents:
|
|
try:
|
|
# Extrahiere Content mit documentExtraction.py
|
|
extracted = await self.document_extractor.processFileData(
|
|
doc.fileData,
|
|
doc.fileName,
|
|
doc.mimeType,
|
|
prompt=f"Extract relevant content for {operation_type}",
|
|
documentId=doc.id,
|
|
enableAI=True
|
|
)
|
|
|
|
# Kombiniere alle Content-Items
|
|
doc_content = []
|
|
for content_item in extracted.contents:
|
|
if content_item.data and content_item.data.strip():
|
|
doc_content.append(content_item.data)
|
|
|
|
if doc_content:
|
|
combined_doc_content = "\n\n".join(doc_content)
|
|
|
|
# Komprimiere falls gewünscht
|
|
if compress_documents and len(combined_doc_content.encode('utf-8')) > 10000: # 10KB Limit
|
|
combined_doc_content = await self._compress_content(
|
|
combined_doc_content,
|
|
10000,
|
|
"document"
|
|
)
|
|
|
|
processed_contents.append(f"Document: {doc.fileName}\n{combined_doc_content}")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error processing document {doc.fileName}: {str(e)}")
|
|
processed_contents.append(f"Document: {doc.fileName}\n[Error processing document: {str(e)}]")
|
|
|
|
return "\n\n---\n\n".join(processed_contents)
|
|
|
|
async def _optimize_content_for_model(
|
|
self,
|
|
prompt: str,
|
|
document_content: str,
|
|
model_name: str,
|
|
compress_prompt: bool,
|
|
compress_documents: bool
|
|
) -> tuple[str, str]:
|
|
"""Optimiert Content für das gewählte Modell"""
|
|
|
|
model_info = AI_MODELS[model_name]
|
|
max_content_size = model_info["context_length"] * 0.7 # 70% für Content
|
|
|
|
optimized_prompt = prompt
|
|
optimized_content = document_content
|
|
|
|
# Prompt komprimieren falls gewünscht
|
|
if compress_prompt and len(prompt.encode('utf-8')) > 2000: # 2KB Limit für Prompt
|
|
optimized_prompt = await self._compress_content(prompt, 2000, "prompt")
|
|
|
|
# Dokument-Content komprimieren falls gewünscht
|
|
if compress_documents and document_content:
|
|
content_size = len(document_content.encode('utf-8'))
|
|
if content_size > max_content_size:
|
|
optimized_content = await self._compress_content(
|
|
document_content,
|
|
int(max_content_size),
|
|
"document"
|
|
)
|
|
|
|
return optimized_prompt, optimized_content
|
|
|
|
async def _compress_content(self, content: str, target_size: int, content_type: str) -> str:
|
|
"""Komprimiert Content intelligent basierend auf Typ"""
|
|
|
|
if len(content.encode('utf-8')) <= target_size:
|
|
return content
|
|
|
|
try:
|
|
# Verwende AI für intelligente Kompression
|
|
compression_prompt = f"""
|
|
Komprimiere den folgenden {content_type} auf maximal {target_size} Zeichen,
|
|
behalte aber alle wichtigen Informationen bei:
|
|
|
|
{content}
|
|
|
|
Gib nur den komprimierten Inhalt zurück, ohne zusätzliche Erklärungen.
|
|
"""
|
|
|
|
# Verwende das schnellste verfügbare Modell für Kompression
|
|
compression_model = "openai_gpt35"
|
|
model_info = AI_MODELS[compression_model]
|
|
connector = getattr(self, f"{model_info['connector']}Service")
|
|
|
|
messages = [{"role": "user", "content": compression_prompt}]
|
|
|
|
if model_info["connector"] == "openai":
|
|
compressed = await connector.callAiBasic(messages)
|
|
else:
|
|
response = await connector.callAiBasic(messages)
|
|
compressed = response["choices"][0]["message"]["content"]
|
|
|
|
return compressed
|
|
|
|
except Exception as e:
|
|
logger.warning(f"AI compression failed, using truncation: {str(e)}")
|
|
# Fallback: Einfache Truncation
|
|
return content[:target_size] + "... [truncated]"
|
|
|
|
async def _execute_ai_call_with_failover(
|
|
self,
|
|
model_name: str,
|
|
prompt: str,
|
|
document_content: str
|
|
) -> str:
|
|
"""Führt AI Call mit automatischem Failover aus"""
|
|
|
|
try:
|
|
model_info = AI_MODELS[model_name]
|
|
connector = getattr(self, f"{model_info['connector']}Service")
|
|
|
|
# Messages vorbereiten
|
|
messages = []
|
|
if document_content:
|
|
messages.append({
|
|
"role": "system",
|
|
"content": f"Context from documents:\n{document_content}"
|
|
})
|
|
|
|
messages.append({
|
|
"role": "user",
|
|
"content": prompt
|
|
})
|
|
|
|
# AI Call ausführen
|
|
if model_info["connector"] == "openai":
|
|
return await connector.callAiBasic(messages)
|
|
else: # anthropic
|
|
response = await connector.callAiBasic(messages)
|
|
return response["choices"][0]["message"]["content"]
|
|
|
|
except ContextLengthExceededException:
|
|
logger.warning(f"Context length exceeded for {model_name}, trying fallback")
|
|
# Fallback zu Modell mit größerem Context
|
|
fallback_model = self._find_fallback_model(model_name)
|
|
if fallback_model:
|
|
return await self._execute_ai_call_with_failover(fallback_model, prompt, document_content)
|
|
else:
|
|
# Letzter Ausweg: Content weiter komprimieren
|
|
compressed_prompt = await self._compress_content(prompt, 1000, "prompt")
|
|
compressed_content = await self._compress_content(document_content, 5000, "document")
|
|
return await self._execute_ai_call_with_failover("openai_gpt35", compressed_prompt, compressed_content)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"AI call failed with {model_name}: {e}")
|
|
# Allgemeiner Fallback
|
|
return await self._execute_ai_call_with_failover("openai_gpt35", prompt, document_content)
|
|
|
|
def _find_fallback_model(self, current_model: str) -> Optional[str]:
|
|
"""Findet ein Fallback-Modell mit größerem Context"""
|
|
current_context = AI_MODELS[current_model]["context_length"]
|
|
|
|
# Suche Modell mit größerem Context
|
|
for model_name, model_info in AI_MODELS.items():
|
|
if model_info["context_length"] > current_context:
|
|
return model_name
|
|
|
|
return None
|
|
|
|
# Legacy methods
|
|
|
|
async def callAiTextBasic(self, prompt: str, context: Optional[str] = None) -> str:
|
|
"""
|
|
Basic text processing - now uses centralized AI call method.
|
|
|
|
Args:
|
|
prompt: The user prompt to process
|
|
context: Optional system context/prompt
|
|
|
|
Returns:
|
|
The AI response as text
|
|
"""
|
|
# Combine context with prompt if provided
|
|
full_prompt = prompt
|
|
if context:
|
|
full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
|
|
|
|
# Use centralized AI call with speed priority for basic calls
|
|
return await self.callAi(
|
|
prompt=full_prompt,
|
|
priority="speed",
|
|
compress_prompt=True,
|
|
compress_documents=False
|
|
)
|
|
|
|
async def callAiTextAdvanced(self, prompt: str, context: Optional[str] = None, _is_fallback: bool = False) -> str:
|
|
"""
|
|
Advanced text processing - now uses centralized AI call method.
|
|
|
|
Args:
|
|
prompt: The user prompt to process
|
|
context: Optional system context/prompt
|
|
_is_fallback: Internal flag (kept for compatibility)
|
|
|
|
Returns:
|
|
The AI response as text
|
|
"""
|
|
# Combine context with prompt if provided
|
|
full_prompt = prompt
|
|
if context:
|
|
full_prompt = f"Context: {context}\n\nUser Request: {prompt}"
|
|
|
|
# Use centralized AI call with quality priority for advanced calls
|
|
return await self.callAi(
|
|
prompt=full_prompt,
|
|
priority="quality",
|
|
compress_prompt=False,
|
|
compress_documents=False
|
|
)
|
|
|
|
async def callAiImageBasic(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
|
|
"""
|
|
Basic image processing - now uses centralized AI call method.
|
|
|
|
Args:
|
|
prompt: The prompt for image analysis
|
|
imageData: The image data (file path or bytes)
|
|
mimeType: Optional MIME type of the image
|
|
|
|
Returns:
|
|
The AI response as text
|
|
"""
|
|
try:
|
|
# For image processing, use the original connector directly
|
|
# as the centralized method doesn't handle images yet
|
|
return await self.openaiService.callAiImage(prompt, imageData, mimeType)
|
|
except Exception as e:
|
|
logger.error(f"Error in OpenAI image call: {str(e)}")
|
|
return f"Error: {str(e)}"
|
|
|
|
async def callAiImageAdvanced(self, prompt: str, imageData: Union[str, bytes], mimeType: str = None) -> str:
|
|
"""
|
|
Advanced image processing - now uses centralized AI call method.
|
|
|
|
Args:
|
|
prompt: The prompt for image analysis
|
|
imageData: The image data (file path or bytes)
|
|
mimeType: Optional MIME type of the image
|
|
|
|
Returns:
|
|
The AI response as text
|
|
"""
|
|
try:
|
|
# For image processing, use the original connector directly
|
|
# as the centralized method doesn't handle images yet
|
|
return await self.anthropicService.callAiImage(prompt, imageData, mimeType)
|
|
except Exception as e:
|
|
logger.error(f"Error in Anthropic image call: {str(e)}")
|
|
return f"Error: {str(e)}"
|
|
|
|
# Convenience methods for common use cases
|
|
|
|
async def callAiForDocumentAnalysis(
|
|
self,
|
|
prompt: str,
|
|
documents: List[ChatDocument],
|
|
priority: str = "balanced"
|
|
) -> str:
|
|
"""Convenience method for document analysis"""
|
|
return await self.callAi(
|
|
prompt=prompt,
|
|
documents=documents,
|
|
operation_type="document_analysis",
|
|
priority=priority,
|
|
compress_documents=True,
|
|
process_documents_individually=False
|
|
)
|
|
|
|
async def callAiForReportGeneration(
|
|
self,
|
|
prompt: str,
|
|
documents: List[ChatDocument],
|
|
priority: str = "quality"
|
|
) -> str:
|
|
"""Convenience method for report generation"""
|
|
return await self.callAi(
|
|
prompt=prompt,
|
|
documents=documents,
|
|
operation_type="report_generation",
|
|
priority=priority,
|
|
compress_documents=True,
|
|
process_documents_individually=True
|
|
)
|
|
|
|
async def callAiForEmailComposition(
|
|
self,
|
|
prompt: str,
|
|
documents: List[ChatDocument] = None,
|
|
priority: str = "speed"
|
|
) -> str:
|
|
"""Convenience method for email composition"""
|
|
return await self.callAi(
|
|
prompt=prompt,
|
|
documents=documents,
|
|
operation_type="email_composition",
|
|
priority=priority,
|
|
compress_prompt=True,
|
|
compress_documents=True
|
|
)
|
|
|
|
async def callAiForTaskPlanning(
|
|
self,
|
|
prompt: str,
|
|
documents: List[ChatDocument] = None,
|
|
priority: str = "balanced"
|
|
) -> str:
|
|
"""Convenience method for task planning"""
|
|
return await self.callAi(
|
|
prompt=prompt,
|
|
documents=documents,
|
|
operation_type="task_planning",
|
|
priority=priority,
|
|
compress_prompt=False,
|
|
compress_documents=True
|
|
)
|
|
|