gateway/gwserver/connector_aichat_anthropic.py
2025-03-25 00:05:01 +01:00

435 lines
No EOL
17 KiB
Python

import os
import json
import logging
import httpx
import base64
import mimetypes
from typing import Dict, Any, List, Optional
from fastapi import HTTPException
import configload as configload
# Logger konfigurieren
logger = logging.getLogger(__name__)
# Konfigurationsdaten laden
def load_config_data():
config = configload.load_config()
return {
"api_key": config.get('Connector_AiAnthropic', 'API_KEY'),
"api_url": config.get('Connector_AiAnthropic', 'API_URL'),
"model_name": config.get('Connector_AiAnthropic', 'MODEL_NAME'),
"temperature": float(config.get('Connector_AiAnthropic', 'TEMPERATURE')),
"max_tokens": int(config.get('Connector_AiAnthropic', 'MAX_TOKENS'))
}
class ChatService:
"""
Connector für die Kommunikation mit der Anthropic API.
"""
def __init__(self):
# Konfiguration laden
self.config = load_config_data()
self.api_key = self.config["api_key"]
self.api_url = self.config["api_url"]
self.model_name = self.config["model_name"]
# HttpClient für API-Aufrufe
self.http_client = httpx.AsyncClient(
timeout=120.0, # Längeres Timeout für komplexe Anfragen
headers={
"x-api-key": self.api_key,
"anthropic-version": "2023-06-01", # Anthropic API Version
"Content-Type": "application/json"
}
)
logger.info(f"Anthropic Connector initialisiert mit Modell: {self.model_name}")
async def call_api(self, messages: List[Dict[str, Any]], temperature: float = None, max_tokens: int = None) -> Dict[str, Any]:
"""
Ruft die Anthropic API mit den gegebenen Nachrichten auf.
Args:
messages: Liste von Nachrichten im OpenAI-Format (role, content)
temperature: Temperatur für die Antwortgenerierung (0.0-1.0)
max_tokens: Maximale Anzahl der Token in der Antwort
Returns:
Die Antwort umgewandelt ins OpenAI-Format
Raises:
HTTPException: Bei Fehlern in der API-Kommunikation
"""
try:
# OpenAI-Format in Anthropic-Format umwandeln
formatted_messages = self._convert_to_anthropic_format(messages)
# Verwende Parameter aus der Konfiguration, falls keine überschrieben wurden
if temperature is None:
temperature = self.config.get("temperature", 0.2)
if max_tokens is None:
max_tokens = self.config.get("max_tokens", 2000)
# Anthropic API Payload erstellen
payload = {
"model": self.model_name,
"messages": formatted_messages,
"temperature": temperature,
"max_tokens": max_tokens
}
response = await self.http_client.post(
self.api_url,
json=payload
)
if response.status_code != 200:
logger.error(f"Anthropic API-Fehler: {response.status_code} - {response.text}")
raise HTTPException(status_code=500, detail="Fehler bei der Kommunikation mit Anthropic API")
# Antwort im Anthropic-Format in OpenAI-Format umwandeln
anthropic_response = response.json()
openai_formatted_response = self._convert_to_openai_format(anthropic_response)
return openai_formatted_response
except Exception as e:
logger.error(f"Fehler beim Aufruf der Anthropic API: {str(e)}")
raise HTTPException(status_code=500, detail=f"Fehler beim Aufruf der Anthropic API: {str(e)}")
def _convert_to_anthropic_format(self, openai_messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Konvertiert Nachrichten vom OpenAI-Format ins Anthropic-Format.
OpenAI verwendet:
[{"role": "system", "content": "..."},
{"role": "user", "content": "..."},
{"role": "assistant", "content": "..."}]
Anthropic verwendet:
[{"role": "user", "content": "..."},
{"role": "assistant", "content": "..."}]
Anmerkung: Anthropic hat kein direktes System-Message-Äquivalent,
daher fügen wir System-Nachrichten in die erste User-Nachricht ein.
"""
anthropic_messages = []
system_content = ""
# Extrahiere zuerst alle System-Nachrichten
for msg in openai_messages:
if msg.get("role") == "system":
system_content += msg.get("content", "") + "\n\n"
# Konvertiere die restlichen Nachrichten
for i, msg in enumerate(openai_messages):
role = msg.get("role")
content = msg.get("content", "")
# System-Nachrichten überspringen (bereits extrahiert)
if role == "system":
continue
# Für die erste User-Nachricht: System-Inhalte voranstellen, falls vorhanden
if role == "user" and system_content and not any(m.get("role") == "user" for m in anthropic_messages):
if isinstance(content, str):
content = system_content + content
elif isinstance(content, list):
# Wenn content ein Array ist (für Multimodal-Nachrichten)
text_parts = []
for part in content:
if part.get("type") == "text":
text_parts.append(part)
if text_parts:
text_parts[0]["text"] = system_content + text_parts[0].get("text", "")
# Anthropic unterstützt nur "user" und "assistant" als Rollen
if role not in ["user", "assistant"]:
role = "user"
anthropic_messages.append({"role": role, "content": content})
return anthropic_messages
def _convert_to_openai_format(self, anthropic_response: Dict[str, Any]) -> Dict[str, Any]:
"""
Konvertiert eine Antwort vom Anthropic-Format ins OpenAI-Format.
Anthropic gibt zurück:
{
"id": "msg_...",
"content": [{"type": "text", "text": "Antworttext"}],
"model": "claude-...",
...
}
OpenAI gibt zurück:
{
"id": "chatcmpl-...",
"object": "chat.completion",
"choices": [
{
"message": {
"role": "assistant",
"content": "Antworttext"
},
"index": 0,
"finish_reason": "stop"
}
],
"model": "gpt-...",
...
}
"""
# Extrahiere Inhalt aus Anthropic-Antwort
content = ""
if "content" in anthropic_response:
if isinstance(anthropic_response["content"], list):
# Inhalt ist eine Liste von Teilen (bei neueren API-Versionen)
for part in anthropic_response["content"]:
if part.get("type") == "text":
content += part.get("text", "")
else:
# Direkter Inhalt als String (bei älteren API-Versionen)
content = anthropic_response["content"]
# Erstelle OpenAI-formatierte Antwort
return {
"id": anthropic_response.get("id", ""),
"object": "chat.completion",
"created": anthropic_response.get("created", 0),
"model": anthropic_response.get("model", self.model_name),
"choices": [
{
"message": {
"role": "assistant",
"content": content
},
"index": 0,
"finish_reason": "stop"
}
]
}
def prepare_file_message_content(self, prompt_text: str, file_paths: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Bereitet eine Nachricht mit Dateien für Anthropic API vor.
Args:
prompt_text: Der Text-Prompt
file_paths: Liste von Dateipfaden mit Metadaten (Dict mit id, name, type, path)
Returns:
Eine für Anthropic-API formatierte content-Liste
"""
message_content = [
{
"type": "text",
"text": prompt_text
}
]
# Füge Dateien als Anhänge hinzu
for file_info in file_paths:
file_path = file_info.get("path", "")
if file_path and os.path.exists(file_path):
try:
# Datei als Base64 codieren
with open(file_path, "rb") as f:
file_data = f.read()
base64_data = base64.b64encode(file_data).decode('utf-8')
# MIME-Typ bestimmen
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
mime_type = "application/octet-stream"
# Content-Type bestimmen (image oder document)
content_type = "image" if mime_type.startswith("image/") else "document"
# Füge die Datei als Anhang hinzu
message_content.append({
"type": content_type,
"source": {
"type": "base64",
"media_type": mime_type,
"data": base64_data
}
})
logger.info(f"Datei {file_info.get('name', 'Unbekannt')} als {content_type} hinzugefügt")
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Datei {file_info.get('name', 'Unbekannt')}: {str(e)}")
return message_content
def parse_filedata(self, file_paths: List[Dict[str, Any]], prompt_text: str = "", file_contents: Dict[str, str] = None) -> Dict[str, Any]:
"""
Bereitet Dateien für die Anthropic API vor und erstellt ein einheitliches Message-Objekt.
Args:
file_paths: Liste von Dateipfaden mit Metadaten (Dict mit id, name, type, path)
prompt_text: Der Text-Prompt, der zusammen mit den Dateien gesendet werden soll
file_contents: Optional vorgelesene Dateiinhalte als Dict[file_id, content]
Returns:
Ein standardisiertes Message-Objekt, das für beide API-Typen verwendet werden kann
"""
# Basisstruktur für die Nachricht
message = {
"role": "user",
"content": []
}
# Text-Prompt hinzufügen
if prompt_text:
message["content"].append({
"type": "text",
"text": prompt_text
})
# Dateien als Anhänge hinzufügen
for file_info in file_paths:
file_path = file_info.get("path", "")
file_name = file_info.get("name", "")
file_id = file_info.get("id", "")
# Prüfen, ob Dateiinhalt bereits vorhanden ist
if file_contents and file_id in file_contents:
# Nur Kontext-Information hinzufügen
message["content"].append({
"type": "text",
"text": f"Datei: {file_name}\n{file_contents[file_id]}"
})
logger.info(f"Vorverarbeiteter Inhalt für Datei {file_name} verwendet")
continue
# Sonst Datei direkt verarbeiten
if file_path and os.path.exists(file_path):
try:
# Datei als Base64 codieren
with open(file_path, "rb") as f:
file_data = f.read()
base64_data = base64.b64encode(file_data).decode('utf-8')
# MIME-Typ bestimmen mit python-magic, wenn verfügbar
try:
import magic
mime_type = magic.from_buffer(file_data, mime=True)
except ImportError:
# Fallback auf mimetypes, wenn python-magic nicht verfügbar ist
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
# Fallback auf Dateierweiterung
extension = os.path.splitext(file_path)[1].lower()[1:]
mime_type = get_mime_type_from_extension(extension)
# Content-Type bestimmen (image, document, oder allgemein file)
content_type, _ = determine_content_structure(mime_type)
# Füge die Datei als Anhang hinzu
message["content"].append({
"type": content_type,
"source": {
"type": "base64",
"media_type": mime_type,
"data": base64_data
}
})
logger.info(f"Datei {file_name} als {content_type} hinzugefügt")
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Datei {file_name}: {str(e)}")
message["content"].append({
"type": "text",
"text": f"[Fehler beim Laden der Datei {file_name}: {str(e)}]"
})
else:
# Datei nicht gefunden - Hinweis einfügen
message["content"].append({
"type": "text",
"text": f"[Datei {file_name} nicht verfügbar]"
})
return message
async def close(self):
"""Schließt den HTTP-Client beim Beenden der Anwendung"""
await self.http_client.aclose()
def determine_content_structure(mime_type: str) -> tuple:
"""
Bestimmt den richtigen content_type und die Nachrichtenstruktur basierend auf dem MIME-Typ.
Args:
mime_type: Der MIME-Typ der Datei
Returns:
Tuple mit (content_type, message_structure)
"""
# Bildtypen
if mime_type.startswith("image/"):
return "image", "image"
# Dokumenttypen
document_types = [
"application/pdf",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document", # docx
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", # xlsx
"application/vnd.openxmlformats-officedocument.presentationml.presentation", # pptx
"application/vnd.ms-excel",
"application/vnd.ms-powerpoint",
"application/msword",
"text/csv",
"text/plain",
"application/json",
"application/xml",
"text/html"
]
if any(mime_type.startswith(dt) for dt in document_types) or mime_type in document_types:
return "document", "document"
# Fallback für unbekannte Typen
return "file", "file"
def get_mime_type_from_extension(extension: str) -> str:
"""
Bestimmt den MIME-Typ basierend auf der Dateiendung.
Args:
extension: Die Dateiendung ohne Punkt
Returns:
Der entsprechende MIME-Typ
"""
extension_to_mime = {
"pdf": "application/pdf",
"docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"doc": "application/msword",
"xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"xls": "application/vnd.ms-excel",
"pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
"ppt": "application/vnd.ms-powerpoint",
"csv": "text/csv",
"txt": "text/plain",
"json": "application/json",
"xml": "application/xml",
"html": "text/html",
"htm": "text/html",
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"png": "image/png",
"gif": "image/gif",
"webp": "image/webp",
"svg": "image/svg+xml"
}
return extension_to_mime.get(extension, "application/octet-stream")