gateway/gwserver/modules/agentservice_interface.py
2025-03-20 00:46:32 +01:00

752 lines
No EOL
34 KiB
Python

import asyncio
import uuid
import os
import json
import logging
import base64
import mimetypes
from typing import List, Dict, Any, Optional
from datetime import datetime
from fastapi import HTTPException
import configload as configload
# Logger konfigurieren
logger = logging.getLogger(__name__)
# Konfigurationsdaten laden
def load_config_data():
config = configload.load_config()
result = {
"application": {
"debug": config.get('Module_AgentserviceInterface', 'DEBUG'),
"upload_dir": config.get('Module_AgentserviceInterface', 'UPLOAD_DIR'),
"results_dir": config.get('Module_AgentserviceInterface', 'RESULTS_DIR'),
"max_history": config.get('Module_AgentserviceInterface', 'MAX_HISTORY'),
"ai_provider": config.get('Module_AgentserviceInterface', 'AI_PROVIDER', fallback="openai") # Standard ist OpenAI
}
}
# Debug-Modus einstellen
if result["application"]["debug"].lower() == "true":
logging.basicConfig(level=logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.debug("Debug-Modus aktiviert")
return result
class AgentService:
"""
Service für die Verwaltung und Ausführung von Multi-Agent-Workflows mit verschiedenen Modellen.
"""
def __init__(self, mandate_id: int = None, user_id: int = None):
"""
Initialisiert den AgentService.
Args:
mandate_id: ID des aktuellen Mandanten (optional)
user_id: ID des aktuellen Benutzers (optional)
"""
# Mandanten- und Benutzerkontext
self.mandate_id = mandate_id
self.user_id = user_id
# Konfiguration laden
self.config = load_config_data()
# Verzeichnisse aus der Konfiguration übernehmen
self.results_dir = self.config["application"]["results_dir"]
self.upload_dir = self.config["application"]["upload_dir"]
self.max_history = int(self.config["application"]["max_history"])
# AI Provider aus der Konfiguration übernehmen
self.ai_provider = self.config["application"]["ai_provider"].lower()
# Verzeichnisse erstellen
os.makedirs(self.results_dir, exist_ok=True)
os.makedirs(self.upload_dir, exist_ok=True)
# Connector-Instanzen initialisieren
if self.ai_provider == "anthropic":
import connector_aichat_anthropic as service_aichat
self.service_aichat = service_aichat.ChatService()
logger.info("Anthropic AI Provider wird verwendet")
else:
import connector_aichat_openai as service_aichat
self.service_aichat = service_aichat.ChatService()
logger.info("OpenAI AI Provider wird verwendet")
import connector_aiweb_webscraping as service_aiscrap
self.service_aiscrap = service_aiscrap.WebScrapingService()
logger.info(f"AgentService initialisiert mit:")
logger.info(f" - AI Provider: {self.ai_provider}")
logger.info(f" - Ergebnisverzeichnis: {self.results_dir}")
logger.info(f" - Upload-Verzeichnis: {self.upload_dir}")
# Workflow-Speicher
self.workflows = {}
async def execute_workflow(
self,
workflow_id: str,
prompt: str,
agents: List[Dict[str, Any]],
files: List[Dict[str, Any]]
) -> str:
"""
Führt einen Workflow mit den angegebenen Agenten und Dateien aus.
Anstatt die Agenten der Reihe nach abzuarbeiten, wird ein AI-Moderator verwendet,
der die Agenten basierend auf ihren Fähigkeiten und bisherigen Antworten steuert.
"""
logger.info(f"Starte Workflow {workflow_id} mit {len(agents)} Agenten und {len(files)} Dateien")
# Mandanten- und Benutzerkontext in die Workflow-Daten aufnehmen
self.workflows[workflow_id] = {
"id": workflow_id,
"mandate_id": self.mandate_id,
"user_id": self.user_id,
"status": "running",
"progress": 0.0,
"started_at": datetime.now().isoformat(),
"completed_at": None,
"agent_statuses": {},
"logs": [],
"results": []
}
# Log-Eintrag für den Start des Workflows
self._add_log(workflow_id, "Workflow gestartet", "info")
self._add_log(workflow_id, f"Verarbeite {len(files)} Dateien...", "info")
# Dateikontexte und Inhalte vorbereiten
file_contexts = []
file_contents = {}
for file in files:
file_id = file["id"]
file_name = file["name"]
file_type = file["type"]
file_path = file.get("path", "")
# Wenn kein Pfad angegeben ist, versuche, ihn aus dem Upload-Verzeichnis abzuleiten
if not file_path and file_name:
possible_path = os.path.join(self.upload_dir, file_name)
if os.path.exists(possible_path):
file_path = possible_path
logger.debug(f"Pfad für Datei {file_name} gefunden: {file_path}")
file_contexts.append({
"id": file_id,
"name": file_name,
"type": file_type,
"size": file.get("size", "Unbekannt"),
"path": file_path
})
# Dateiinhalt lesen, wenn der Pfad verfügbar ist
if file_path and os.path.exists(file_path):
try:
# Text-basierte Dateien direkt lesen
if file_type == "document":
# Einfache Textdateien
if file_name.endswith(('.txt', '.csv', '.md', '.json')):
with open(file_path, 'r', encoding='utf-8') as f:
file_contents[file_id] = f.read()
self._add_log(workflow_id, f"Datei {file_name} gelesen", "info")
# Excel-Dateien
elif file_name.endswith(('.xlsx', '.xls')):
import pandas as pd
try:
df = pd.read_excel(file_path)
file_contents[file_id] = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n"
file_contents[file_id] += "Erste 5 Zeilen:\n"
file_contents[file_id] += df.head(5).to_string()
self._add_log(workflow_id, f"Excel-Datei {file_name} gelesen", "info")
except Exception as e:
self._add_log(workflow_id, f"Fehler beim Lesen der Excel-Datei {file_name}: {str(e)}", "error")
# CSV-Dateien
elif file_name.endswith('.csv'):
import pandas as pd
try:
df = pd.read_csv(file_path)
file_contents[file_id] = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n"
file_contents[file_id] += "Erste 5 Zeilen:\n"
file_contents[file_id] += df.head(5).to_string()
self._add_log(workflow_id, f"CSV-Datei {file_name} gelesen", "info")
except Exception as e:
self._add_log(workflow_id, f"Fehler beim Lesen der CSV-Datei {file_name}: {str(e)}", "error")
# PDF-Dateien
elif file_name.endswith('.pdf'):
try:
# Falls PyPDF2 installiert ist
try:
from PyPDF2 import PdfReader
reader = PdfReader(file_path)
text = ""
for page in reader.pages:
text += page.extract_text() + "\n\n"
file_contents[file_id] = f"PDF mit {len(reader.pages)} Seiten.\nInhalt:\n{text[:2000]}..."
self._add_log(workflow_id, f"PDF-Datei {file_name} gelesen", "info")
except ImportError:
self._add_log(workflow_id, "PyPDF2 nicht installiert. PDF-Inhalt kann nicht extrahiert werden.", "warning")
file_contents[file_id] = f"PDF-Datei (Inhalt nicht verfügbar, PyPDF2 fehlt)"
except Exception as e:
self._add_log(workflow_id, f"Fehler beim Lesen der PDF-Datei {file_name}: {str(e)}", "error")
# Andere Dokumenttypen
else:
self._add_log(workflow_id, f"Nicht unterstütztes Dokumentformat: {file_name}", "warning")
file_contents[file_id] = f"Dateiinhalt nicht verfügbar (Nicht unterstütztes Format)"
# Bilddateien werden nicht direkt gelesen, nur Metadaten gespeichert
elif file_type == "image":
file_contents[file_id] = f"Bilddatei: {file_name} (Inhalt nicht als Text verfügbar)"
except Exception as e:
logger.error(f"Fehler beim Lesen der Datei {file_name}: {str(e)}")
self._add_log(workflow_id, f"Fehler beim Lesen der Datei {file_name}: {str(e)}", "error")
else:
if file_path:
self._add_log(workflow_id, f"Datei {file_name} nicht gefunden: {file_path}", "warning")
else:
self._add_log(workflow_id, f"Kein Pfad für Datei {file_name} verfügbar", "warning")
file_contents[file_id] = f"Dateiinhalt nicht verfügbar"
# Erstelle einen Kontext mit Dateiliste und Inhalten für leichteren Zugriff
file_context_text = "Verfügbare Dateien:\n" + "\n".join([f"- {file['name']} ({file['type']}, {file['size']})" for file in file_contexts])
# Füge Dateiinhalte hinzu (mit Längenbegrenzung)
for file_id, content in file_contents.items():
file_name = next((f['name'] for f in file_contexts if f['id'] == file_id), "Unbekannte Datei")
file_context_text += f"\n\n==== DATEIINHALT: {file_name} ====\n"
# Begrenze den Inhalt, um Token-Limits zu respektieren
max_content_length = 5000 # Anpassen je nach Anzahl der Dateien und Umfang
if len(content) > max_content_length:
file_context_text += content[:max_content_length] + "...\n[Dateiinhalt gekürzt aus Platzgründen]"
else:
file_context_text += content
self.workflows[workflow_id]["progress"] = 0.1
# Initialisiere den Chatverlauf für den Agenten-Dialog
chat_history = []
# Erstelle das Nachrichtenobjekt für die initialen Dateien und den Prompt
message_content = self.service_aichat.prepare_file_message_content(prompt, file_contexts)
# Füge Dateien als Base64-Anhänge hinzu
for file in file_contexts:
if file["path"] and os.path.exists(file["path"]):
try:
# Datei als Base64 codieren
with open(file["path"], "rb") as f:
file_data = f.read()
base64_data = base64.b64encode(file_data).decode('utf-8')
# MIME-Typ bestimmen
mime_type, _ = mimetypes.guess_type(file["path"])
if not mime_type:
mime_type = "application/octet-stream"
# Füge die Datei als Anhang hinzu
message_content.append({
"type": "file",
"source": {
"type": "base64",
"media_type": mime_type,
"data": base64_data
}
})
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Datei {file['name']} als Anhang: {str(e)}")
# Nachrichtenobjekt erstellen
initial_message = {
"role": "user",
"content": message_content
}
# Initialen Prompt zum Chatverlauf hinzufügen
chat_history.append(initial_message)
# Initialisiere die verfügbaren Agenten mit ihren Fähigkeiten
available_agents = {}
for agent in agents:
agent_id = agent["id"]
agent_name = agent["name"]
agent_type = agent["type"]
agent_capabilities = agent.get("capabilities", "")
available_agents[agent_id] = {
"id": agent_id,
"name": agent_name,
"type": agent_type,
"capabilities": agent_capabilities,
"used": False
}
# Initialisiere den Status
self.workflows[workflow_id]["agent_statuses"][agent_id] = "pending"
# Initialisiere die Moderator-Rolle - Fester Teil
moderator_prompt_base = """
Du bist der Moderator eines Multi-Agent-Systems. Deine Aufgabe ist es, die Zusammenarbeit zwischen verschiedenen spezialisierten Agenten zu koordinieren, um die Anfrage des Benutzers bestmöglich zu erfüllen.
Du sollst:
1. Die Anfrage des Benutzers verstehen und analysieren
2. Den am besten geeigneten Agenten basierend auf seinen Fähigkeiten auswählen
3. Die Antworten der Agenten überwachen und bewerten
4. Falls nötig, weitere Agenten hinzuziehen, um die Anfrage vollständig zu bearbeiten
5. Den Workflow beenden, wenn die Anfrage vollständig erfüllt wurde
Für jeden Schritt sollst du begründen, warum du einen bestimmten Agenten auswählst, und zusammenfassen, was bisher erreicht wurde.
"""
# Dynamischer Teil - Verfügbare Agenten aus den tatsächlich vorhandenen Agenten
agents_description = "Verfügbare Agenten:\n"
for agent_id, agent in available_agents.items():
agents_description += f"- {agent['name']} (Typ: {agent['type']}): {agent['capabilities']}\n"
moderator_prompt_end = """
Beende den Workflow, wenn die Aufgabe erfüllt ist oder keine weiteren Agenten zur Bearbeitung beitragen können.
"""
# Kombiniere alle Teile
moderator_system_prompt = moderator_prompt_base + "\n" + agents_description + "\n" + moderator_prompt_end
# Starte den Workflow mit dem Moderator
self._add_log(workflow_id, "Starte Agenten-Tischrunde mit Moderator", "info")
# Maximale Anzahl der Runden zur Vermeidung endloser Schleifen
max_rounds = 12
current_round = 0
workflow_complete = False
while current_round < max_rounds and not workflow_complete:
current_round += 1
self._add_log(workflow_id, f"Starte Runde {current_round}", "info")
# Der Moderator wählt den nächsten Agenten aus
moderator_prompt = {
"role": "system",
"content": moderator_system_prompt
}
# Kopie des Chatverlaufs für den Moderator erstellen
moderator_chat = [moderator_prompt] + chat_history[-self.max_history:]
# Füge eine Zusammenfassung der verfügbaren Agenten hinzu
agent_info = "Verfügbare Agenten:\n"
for agent_id, agent in available_agents.items():
status = "✓ Bereits verwendet" if agent["used"] else "✗ Noch nicht verwendet"
agent_info += f"- {agent['name']} (Typ: {agent['type']}): {agent['capabilities']}\n Status: {status}\n"
moderator_chat.append({
"role": "system",
"content": agent_info + "\nWähle den nächsten Agenten aus oder beende den Workflow, wenn die Aufgabe erfüllt ist."
})
# Moderator trifft die Entscheidung
try:
moderator_decision = await self.service_aichat.call_api(moderator_chat)
moderator_text = moderator_decision["choices"][0]["message"]["content"]
# Füge die Entscheidung des Moderators zum Chatverlauf hinzu
chat_history.append({
"role": "assistant",
"content": f"[Moderator] {moderator_text}"
})
# Log der Moderator-Entscheidung
self._add_log(workflow_id, f"Moderator-Entscheidung: {moderator_text[:100]}...", "info")
# Prüfe, ob der Workflow beendet werden soll
if any(phrase in moderator_text.lower() for phrase in ["workflow beenden", "aufgabe erfüllt", "beende den workflow", "workflow abschließen"]):
self._add_log(workflow_id, "Moderator hat den Workflow beendet", "success")
workflow_complete = True
break
# Extrahiere den ausgewählten Agenten
selected_agent_id = None
# Versuche, den ausgewählten Agenten aus dem Text zu extrahieren
for agent_id, agent in available_agents.items():
if agent["name"] in moderator_text or f"Agent {agent_id}" in moderator_text:
selected_agent_id = agent_id
break
if not selected_agent_id:
self._add_log(workflow_id, "Moderator konnte keinen Agenten identifizieren", "warning")
# Wähle den ersten nicht verwendeten Agenten
for agent_id, agent in available_agents.items():
if not agent["used"]:
selected_agent_id = agent_id
break
# Wenn alle Agenten bereits verwendet wurden, wähle den Initialisierungs-Agenten
if not selected_agent_id:
for agent_id, agent in available_agents.items():
if agent["type"] == "initialisierung":
selected_agent_id = agent_id
break
# Als letztes Mittel wähle einfach den ersten Agenten
if not selected_agent_id and available_agents:
selected_agent_id = list(available_agents.keys())[0]
if selected_agent_id:
# Agenten aus der Liste markieren
selected_agent = available_agents[selected_agent_id]
selected_agent["used"] = True
# Agenten-Status aktualisieren
self.workflows[workflow_id]["agent_statuses"][selected_agent_id] = "running"
self._add_log(
workflow_id,
f"Agent '{selected_agent['name']}' beginnt mit der Verarbeitung...",
"start",
selected_agent_id,
selected_agent['name']
)
# Agent-spezifische Anweisungen erstellen
agent_instructions = self._get_agent_instructions(selected_agent["type"])
# Agent-Prompt erstellen
agent_prompt = {
"role": "system",
"content": f"""
# Aufgabe
Du bist ein spezialisierter Agent vom Typ {selected_agent['type']} mit dem Namen {selected_agent['name']}.
{agent_instructions}
Bitte analysiere den Chatverlauf und die Dateien und beantworte die Anfrage gemäß deiner Rolle.
Ausgabeformat:
[Agent: {selected_agent['name']}]
Deine Antwort...
"""
}
# Kopie des Chatverlaufs für den Agenten erstellen
agent_chat = [agent_prompt] + chat_history[-self.max_history:]
# Falls der Agent ein Webscraper ist und Scraping notwendig ist
if selected_agent["type"] == "scraper":
self._add_log(workflow_id, "Führe Web-Scraping durch...", "info", selected_agent_id, selected_agent["name"])
web_data = await self.service_aiscrap.scrape_web_data(prompt)
if web_data:
agent_chat.append({
"role": "system",
"content": f"# Gescrapte Web-Daten\n{web_data}"
})
self._add_log(workflow_id, "Web-Scraping abgeschlossen", "info", selected_agent_id, selected_agent["name"])
# Agent führt seinen Teil aus
try:
agent_response = await self.service_aichat.call_api(agent_chat)
agent_text = agent_response["choices"][0]["message"]["content"]
# Füge die Antwort des Agenten zum Chatverlauf hinzu
chat_history.append({
"role": "assistant",
"content": agent_text
})
# Agent-Ergebnis erstellen
result = self._create_agent_result(
workflow_id,
selected_agent,
len(self.workflows[workflow_id]["results"]),
prompt,
file_contexts,
agent_text
)
self.workflows[workflow_id]["results"].append(result)
self._add_log(
workflow_id,
f"Agent '{selected_agent['name']}' hat die Verarbeitung abgeschlossen",
"complete",
selected_agent_id,
selected_agent["name"]
)
# Agenten-Status aktualisieren
self.workflows[workflow_id]["agent_statuses"][selected_agent_id] = "completed"
except Exception as e:
logger.error(f"Fehler bei der Ausführung von Agent '{selected_agent['name']}': {str(e)}")
self._add_log(
workflow_id,
f"Fehler bei der Ausführung: {str(e)}",
"error",
selected_agent_id,
selected_agent["name"]
)
self.workflows[workflow_id]["agent_statuses"][selected_agent_id] = "failed"
# Füge die Fehlermeldung zum Chatverlauf hinzu
chat_history.append({
"role": "assistant",
"content": f"[Fehler bei Agent '{selected_agent['name']}']: {str(e)}"
})
else:
self._add_log(workflow_id, "Kein Agent ausgewählt. Beende Workflow.", "warning")
workflow_complete = True
# Fortschritt aktualisieren
self.workflows[workflow_id]["progress"] = min(0.9, 0.1 + (current_round / max_rounds) * 0.8)
except Exception as e:
logger.error(f"Fehler in der Moderator-Phase: {str(e)}")
self._add_log(workflow_id, f"Fehler in der Moderator-Phase: {str(e)}", "error")
break
# Workflow abschließen
if workflow_complete:
self.workflows[workflow_id]["status"] = "completed"
self._add_log(workflow_id, "Workflow erfolgreich beendet", "success")
elif current_round >= max_rounds:
self.workflows[workflow_id]["status"] = "completed"
self._add_log(workflow_id, f"Workflow nach {max_rounds} Runden automatisch beendet", "info")
else:
self.workflows[workflow_id]["status"] = "failed"
self._add_log(workflow_id, "Workflow mit Fehlern beendet", "error")
self.workflows[workflow_id]["progress"] = 1.0
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
# Speichere Ergebnisse in Datei für spätere Verwendung
self._save_workflow_results(workflow_id)
return workflow_id
def _get_agent_instructions(self, agent_type: str) -> str:
"""
Gibt agententypspezifische Anweisungen zurück, die aus der agents.json geladen werden.
Falls die Datei nicht existiert oder der Agententyp nicht gefunden wird,
wird ein Standardagent zurückgegeben.
"""
try:
# Pfad zur agents.json-Datei
agents_file = os.path.join(os.path.dirname(__file__), 'data', 'agents.json')
# Überprüfen, ob die Datei existiert
if not os.path.exists(agents_file):
logger.warning(f"Agents-Definitionen nicht gefunden: {agents_file}")
return self._get_default_agent_instructions()
# Datei lesen
with open(agents_file, 'r', encoding='utf-8') as f:
agents_data = json.load(f)
# Nach dem Agententyp suchen
for agent in agents_data:
if agent.get("type") == agent_type:
# Anweisungen zurückgeben, wenn vorhanden
instructions = agent.get("instructions")
if instructions:
logger.debug(f"Anweisungen für Agent-Typ '{agent_type}' aus agents.json geladen")
return instructions
# Wenn kein passender Agent gefunden wurde, Standardanweisungen verwenden
logger.warning(f"Keine Anweisungen für Agent-Typ '{agent_type}' in agents.json gefunden")
return self._get_default_agent_instructions()
except Exception as e:
logger.error(f"Fehler beim Laden der Agent-Anweisungen aus agents.json: {e}")
return self._get_default_agent_instructions()
def _get_default_agent_instructions(self) -> str:
"""
Gibt Standard-Anweisungen für einen Agenten zurück,
wenn keine spezifischen Anweisungen in der agents.json gefunden wurden.
Diese Funktion gibt generische Anweisungen zurück, unabhängig vom Agententyp.
"""
return """
Als Agent ist es deine Aufgabe, Anfragen zu analysieren und entsprechend deinen Fähigkeiten zu bearbeiten.
Folge diesen allgemeinen Anweisungen:
1. Verstehe die Anfrage gründlich
2. Analysiere relevante Daten und Informationen
3. Liefere präzise und hilfreiche Antworten
4. Strukturiere deine Antwort klar und verständlich
In deiner Antwort:
- Beginne mit einer Zusammenfassung der Anfrage
- Gib gut begründete Antworten oder Empfehlungen
- Führe wichtige Erkenntnisse klar auf
- Schließe mit konkreten nächsten Schritten oder Empfehlungen ab
"""
def _add_log(
self,
workflow_id: str,
message: str,
log_type: str,
agent_id: Optional[str] = None,
agent_name: Optional[str] = None
) -> None:
"""Fügt einen Protokolleintrag zum Workflow hinzu"""
log_entry = {
"id": f"log_{uuid.uuid4()}",
"mandate_id": self.mandate_id,
"user_id": self.user_id,
"message": message,
"type": log_type,
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"agent_name": agent_name
}
workflow = self.workflows.get(workflow_id)
if workflow:
workflow["logs"].append(log_entry)
logger.info(f"Workflow {workflow_id}: {message}")
def _create_agent_result(
self,
workflow_id: str,
agent: Dict[str, Any],
index: int,
prompt: str,
file_contexts: List[Dict[str, Any]],
content: str
) -> Dict[str, Any]:
"""Erstellt ein Ergebnisobjekt basierend auf dem Agententyp und der API-Antwort"""
agent_type = agent["type"]
agent_id = agent["id"]
agent_name = agent["name"]
# Grundlegende Ergebnisstruktur
result = {
"id": f"result_{workflow_id}_{index}",
"mandate_id": self.mandate_id,
"user_id": self.user_id,
"agent_id": agent_id,
"agent_name": agent_name,
"timestamp": datetime.now().isoformat(),
"type": "text", # Standardtyp
"metadata": {
"files_processed": [file["name"] for file in file_contexts],
"prompt": prompt
}
}
# Titel und Inhalt basierend auf dem Agententyp anpassen
if agent_type == "analyzer":
result.update({
"title": "Datenanalyse-Ergebnis",
"content": content,
})
elif agent_type == "visualizer":
result.update({
"title": "Visualisierungsvorschlag",
"content": content,
"type": "chart" # Auch wenn kein echtes Diagramm, markieren wir es als solches
})
elif agent_type == "writer":
result.update({
"title": "Zusammenfassung und Empfehlungen",
"content": content,
})
elif agent_type == "scraper":
result.update({
"title": "Web-Recherche Ergebnisse",
"content": content,
})
elif agent_type == "initialisierung":
result.update({
"title": "Direkte Antwort",
"content": content,
})
elif agent_type == "organisator":
result.update({
"title": "Aufgabenstrukturierung",
"content": content,
})
elif agent_type == "entwickler":
result.update({
"title": "Code und Ausführungsergebnisse",
"content": content,
})
else:
result.update({
"title": f"Ergebnis von {agent_name}",
"content": content,
})
return result
def _save_workflow_results(self, workflow_id: str) -> None:
"""Speichert die Workflow-Ergebnisse in einer Datei"""
workflow = self.workflows.get(workflow_id)
if workflow:
try:
file_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json")
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(workflow, f, indent=2, ensure_ascii=False)
logger.info(f"Workflow-Ergebnisse gespeichert: {file_path}")
except Exception as e:
logger.error(f"Fehler beim Speichern der Workflow-Ergebnisse: {e}")
def get_workflow_status(self, workflow_id: str) -> Optional[Dict[str, Any]]:
"""Gibt den Status eines Workflows zurück"""
workflow = self.workflows.get(workflow_id)
if not workflow:
return None
return {
"id": workflow["id"],
"mandate_id": workflow.get("mandate_id"),
"user_id": workflow.get("user_id"),
"status": workflow["status"],
"progress": workflow["progress"],
"started_at": workflow["started_at"],
"completed_at": workflow["completed_at"],
"agent_statuses": workflow["agent_statuses"]
}
def get_workflow_logs(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]:
"""Gibt die Protokolle eines Workflows zurück"""
workflow = self.workflows.get(workflow_id)
if not workflow:
return None
return workflow["logs"]
def get_workflow_results(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]:
"""Gibt die Ergebnisse eines Workflows zurück"""
workflow = self.workflows.get(workflow_id)
if not workflow:
return None
return workflow["results"]
async def close(self):
"""Schließt die HTTP-Clients beim Beenden der Anwendung"""
await self.service_aichat.close()
await self.service_aiscrap.close()
# Singleton-Factory für AgentService-Instanzen pro Kontext
_agent_service_instances = {}
def get_agentservice_interface(mandate_id: int = None, user_id: int = None) -> AgentService:
"""
Gibt eine AgentService-Instanz für den angegebenen Kontext zurück.
Wiederverwendet bestehende Instanzen.
"""
context_key = f"{mandate_id}_{user_id}"
if context_key not in _agent_service_instances:
_agent_service_instances[context_key] = AgentService(mandate_id, user_id)
return _agent_service_instances[context_key]