255 lines
No EOL
10 KiB
Python
255 lines
No EOL
10 KiB
Python
import asyncio
|
|
import uuid
|
|
from typing import List, Dict, Any, Optional
|
|
from datetime import datetime
|
|
import logging
|
|
import json
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AgentService:
|
|
"""
|
|
Service für die Verwaltung und Ausführung von Multi-Agent-Workflows.
|
|
In einer Produktionsumgebung würde hier eine Integration mit echten KI-Diensten
|
|
und eine Orchestrierung der verschiedenen Agenten stattfinden.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.workflows = {}
|
|
self.results_dir = "./results"
|
|
os.makedirs(self.results_dir, exist_ok=True)
|
|
|
|
async def execute_workflow(
|
|
self,
|
|
workflow_id: str,
|
|
prompt: str,
|
|
agents: List[Dict[str, Any]],
|
|
files: List[Dict[str, Any]]
|
|
) -> str:
|
|
"""
|
|
Führt einen Workflow mit den angegebenen Agenten und Dateien aus.
|
|
Diese Methode simuliert die Ausführung für Demo-Zwecke.
|
|
|
|
In einer echten Implementierung würde hier die tatsächliche Orchestrierung
|
|
der Agenten und die Verarbeitung der Dateien stattfinden.
|
|
"""
|
|
logger.info(f"Starte Workflow {workflow_id} mit {len(agents)} Agenten und {len(files)} Dateien")
|
|
|
|
# Workflow initialisieren
|
|
self.workflows[workflow_id] = {
|
|
"id": workflow_id,
|
|
"status": "running",
|
|
"progress": 0.0,
|
|
"started_at": datetime.now().isoformat(),
|
|
"completed_at": None,
|
|
"agent_statuses": {},
|
|
"logs": [],
|
|
"results": []
|
|
}
|
|
|
|
# Log-Eintrag für den Start des Workflows
|
|
self._add_log(workflow_id, "Workflow gestartet", "info")
|
|
self._add_log(workflow_id, f"Verarbeite {len(files)} Dateien...", "info")
|
|
|
|
self.workflows[workflow_id]["progress"] = 0.1
|
|
await asyncio.sleep(1) # Simuliere Verarbeitungszeit
|
|
|
|
# Verarbeitung pro Agent simulieren
|
|
for i, agent in enumerate(agents):
|
|
agent_id = agent["id"]
|
|
agent_name = agent["name"]
|
|
agent_type = agent["type"]
|
|
|
|
self.workflows[workflow_id]["agent_statuses"][agent_id] = "running"
|
|
self._add_log(
|
|
workflow_id,
|
|
f"Agent '{agent_name}' beginnt mit der Verarbeitung...",
|
|
"start",
|
|
agent_id,
|
|
agent_name
|
|
)
|
|
|
|
# Simuliere Verarbeitungszeit
|
|
self.workflows[workflow_id]["progress"] = 0.1 + (i + 1) * (0.8 / len(agents)) * 0.5
|
|
await asyncio.sleep(2)
|
|
|
|
# Agent-Ergebnis simulieren
|
|
result = self._generate_simulated_result(workflow_id, agent, i, prompt, files)
|
|
self.workflows[workflow_id]["results"].append(result)
|
|
|
|
self._add_log(
|
|
workflow_id,
|
|
f"Agent '{agent_name}' hat die Verarbeitung abgeschlossen",
|
|
"complete",
|
|
agent_id,
|
|
agent_name
|
|
)
|
|
self.workflows[workflow_id]["agent_statuses"][agent_id] = "completed"
|
|
|
|
self.workflows[workflow_id]["progress"] = 0.1 + (i + 1) * (0.8 / len(agents))
|
|
await asyncio.sleep(1)
|
|
|
|
# Workflow abschließen
|
|
self._add_log(workflow_id, "Alle Agenten haben ihre Aufgaben abgeschlossen", "info")
|
|
self._add_log(workflow_id, "Workflow erfolgreich beendet", "success")
|
|
|
|
self.workflows[workflow_id]["status"] = "completed"
|
|
self.workflows[workflow_id]["progress"] = 1.0
|
|
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
|
|
|
|
# Speichere Ergebnisse in Datei für spätere Verwendung
|
|
self._save_workflow_results(workflow_id)
|
|
|
|
return workflow_id
|
|
|
|
def _add_log(
|
|
self,
|
|
workflow_id: str,
|
|
message: str,
|
|
log_type: str,
|
|
agent_id: Optional[str] = None,
|
|
agent_name: Optional[str] = None
|
|
) -> None:
|
|
"""Fügt einen Protokolleintrag zum Workflow hinzu"""
|
|
log_entry = {
|
|
"id": f"log_{uuid.uuid4()}",
|
|
"message": message,
|
|
"type": log_type,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"agent_id": agent_id,
|
|
"agent_name": agent_name
|
|
}
|
|
|
|
workflow = self.workflows.get(workflow_id)
|
|
if workflow:
|
|
workflow["logs"].append(log_entry)
|
|
logger.info(f"Workflow {workflow_id}: {message}")
|
|
|
|
def _generate_simulated_result(
|
|
self,
|
|
workflow_id: str,
|
|
agent: Dict[str, Any],
|
|
index: int,
|
|
prompt: str,
|
|
files: List[Dict[str, Any]]
|
|
) -> Dict[str, Any]:
|
|
"""Generiert ein simuliertes Ergebnis basierend auf dem Agententyp"""
|
|
agent_type = agent["type"]
|
|
agent_id = agent["id"]
|
|
agent_name = agent["name"]
|
|
|
|
# Dateien als Teil des Kontexts einbinden
|
|
file_names = [f["name"] for f in files]
|
|
file_context = f"Basierend auf der Analyse von: {', '.join(file_names)}"
|
|
|
|
result = {
|
|
"id": f"result_{workflow_id}_{index}",
|
|
"agent_id": agent_id,
|
|
"agent_name": agent_name,
|
|
"timestamp": datetime.now().isoformat(),
|
|
"metadata": {
|
|
"files_processed": file_names,
|
|
"prompt": prompt
|
|
}
|
|
}
|
|
|
|
if agent_type == "analyzer":
|
|
result.update({
|
|
"title": "Datenanalyse-Ergebnis",
|
|
"type": "text",
|
|
"content": f"{file_context}\n\nDie Analyse der bereitgestellten Daten zeigt folgende Schlüsselerkenntnisse:\n\n1. Die Umsätze stiegen im Q1 2025 um 12% im Vergleich zum Vorjahr\n2. Produktkategorie A zeigt die stärkste Leistung mit 23% Wachstum\n3. In den Regionen Nord und West wurde das größte Wachstum beobachtet\n4. Die Betriebskosten sind um 5% gesunken, was zu einer verbesserten Marge führt"
|
|
})
|
|
elif agent_type == "visualizer":
|
|
result.update({
|
|
"title": "Umsatzentwicklung nach Quartal",
|
|
"type": "chart",
|
|
"content": "Diagramm der Umsatzentwicklung (simuliert)",
|
|
"metadata": {
|
|
**result["metadata"],
|
|
"chart_type": "line",
|
|
"chart_data": {
|
|
"labels": ["Q1 2024", "Q2 2024", "Q3 2024", "Q4 2024", "Q1 2025"],
|
|
"datasets": [
|
|
{
|
|
"label": "Umsatz (Mio. €)",
|
|
"data": [2.1, 2.4, 2.8, 3.2, 3.6]
|
|
}
|
|
]
|
|
}
|
|
}
|
|
})
|
|
elif agent_type == "writer":
|
|
result.update({
|
|
"title": "Zusammenfassung und Empfehlungen",
|
|
"type": "text",
|
|
"content": f"{file_context}\n\n# Zusammenfassung Q1 2025\n\nDie Unternehmensdaten zeigen ein starkes erstes Quartal mit signifikanten Verbesserungen in allen Geschäftsbereichen. Der Gesamtumsatz stieg um 12%, während die Betriebskosten um 5% sanken, was zu einer verbesserten Gewinnmarge führte.\n\n## Empfehlungen\n\n1. **Investitionen in Produktkategorie A ausbauen** - Diese Kategorie zeigt das stärkste Wachstum und sollte weiter gefördert werden\n2. **Marketingaktivitäten in Süd- und Ostregionen verstärken** - Diese Regionen zeigen Potenzial für Wachstum\n3. **Kostenoptimierungen beibehalten** - Die Maßnahmen zur Kostensenkung haben sich als effektiv erwiesen\n\n## Prognose für Q2 2025\n\nBei gleichbleibenden Marktbedingungen erwarten wir ein weiteres Wachstum von 8-10% im Q2 2025."
|
|
})
|
|
elif agent_type == "scraper":
|
|
result.update({
|
|
"title": "Web-Scraping Ergebnisse",
|
|
"type": "text",
|
|
"content": f"# Marktdaten aus Web-Quellen\n\nBasierend auf der Analyse von 15 führenden Branchenwebsites wurden folgende Trends identifiziert:\n\n1. Der Gesamtmarkt wächst mit einer Rate von 8,5% jährlich\n2. Hauptwettbewerber A und B haben kürzlich neue Produktlinien gestartet\n3. Die durchschnittlichen Marktpreise sind in den letzten 6 Monaten um 3,2% gestiegen"
|
|
})
|
|
else:
|
|
result.update({
|
|
"title": "Verarbeitungsergebnis",
|
|
"type": "text",
|
|
"content": f"Allgemeines Ergebnis der Verarbeitung durch {agent_name}"
|
|
})
|
|
|
|
return result
|
|
|
|
def _save_workflow_results(self, workflow_id: str) -> None:
|
|
"""Speichert die Workflow-Ergebnisse in einer Datei"""
|
|
workflow = self.workflows.get(workflow_id)
|
|
if workflow:
|
|
try:
|
|
file_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json")
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(workflow, f, indent=2, ensure_ascii=False)
|
|
logger.info(f"Workflow-Ergebnisse gespeichert: {file_path}")
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Speichern der Workflow-Ergebnisse: {e}")
|
|
|
|
def get_workflow_status(self, workflow_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Gibt den Status eines Workflows zurück"""
|
|
workflow = self.workflows.get(workflow_id)
|
|
if not workflow:
|
|
return None
|
|
|
|
return {
|
|
"id": workflow["id"],
|
|
"status": workflow["status"],
|
|
"progress": workflow["progress"],
|
|
"started_at": workflow["started_at"],
|
|
"completed_at": workflow["completed_at"],
|
|
"agent_statuses": workflow["agent_statuses"]
|
|
}
|
|
|
|
def get_workflow_logs(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]:
|
|
"""Gibt die Protokolle eines Workflows zurück"""
|
|
workflow = self.workflows.get(workflow_id)
|
|
if not workflow:
|
|
return None
|
|
|
|
return workflow["logs"]
|
|
|
|
def get_workflow_results(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]:
|
|
"""Gibt die Ergebnisse eines Workflows zurück"""
|
|
workflow = self.workflows.get(workflow_id)
|
|
if not workflow:
|
|
return None
|
|
|
|
return workflow["results"]
|
|
|
|
|
|
# Singleton-Instanz des AgentService
|
|
_agent_service_instance = None
|
|
|
|
def get_agent_service():
|
|
"""Gibt eine Singleton-Instanz des AgentService zurück"""
|
|
global _agent_service_instance
|
|
if _agent_service_instance is None:
|
|
_agent_service_instance = AgentService()
|
|
return _agent_service_instance |