import asyncio import uuid from typing import List, Dict, Any, Optional from datetime import datetime import logging import json import os logger = logging.getLogger(__name__) class AgentService: """ Service für die Verwaltung und Ausführung von Multi-Agent-Workflows. In einer Produktionsumgebung würde hier eine Integration mit echten KI-Diensten und eine Orchestrierung der verschiedenen Agenten stattfinden. """ def __init__(self): self.workflows = {} self.results_dir = "./results" os.makedirs(self.results_dir, exist_ok=True) async def execute_workflow( self, workflow_id: str, prompt: str, agents: List[Dict[str, Any]], files: List[Dict[str, Any]] ) -> str: """ Führt einen Workflow mit den angegebenen Agenten und Dateien aus. Diese Methode simuliert die Ausführung für Demo-Zwecke. In einer echten Implementierung würde hier die tatsächliche Orchestrierung der Agenten und die Verarbeitung der Dateien stattfinden. """ logger.info(f"Starte Workflow {workflow_id} mit {len(agents)} Agenten und {len(files)} Dateien") # Workflow initialisieren self.workflows[workflow_id] = { "id": workflow_id, "status": "running", "progress": 0.0, "started_at": datetime.now().isoformat(), "completed_at": None, "agent_statuses": {}, "logs": [], "results": [] } # Log-Eintrag für den Start des Workflows self._add_log(workflow_id, "Workflow gestartet", "info") self._add_log(workflow_id, f"Verarbeite {len(files)} Dateien...", "info") self.workflows[workflow_id]["progress"] = 0.1 await asyncio.sleep(1) # Simuliere Verarbeitungszeit # Verarbeitung pro Agent simulieren for i, agent in enumerate(agents): agent_id = agent["id"] agent_name = agent["name"] agent_type = agent["type"] self.workflows[workflow_id]["agent_statuses"][agent_id] = "running" self._add_log( workflow_id, f"Agent '{agent_name}' beginnt mit der Verarbeitung...", "start", agent_id, agent_name ) # Simuliere Verarbeitungszeit self.workflows[workflow_id]["progress"] = 0.1 + (i + 1) * (0.8 / len(agents)) * 0.5 await asyncio.sleep(2) # Agent-Ergebnis simulieren result = self._generate_simulated_result(workflow_id, agent, i, prompt, files) self.workflows[workflow_id]["results"].append(result) self._add_log( workflow_id, f"Agent '{agent_name}' hat die Verarbeitung abgeschlossen", "complete", agent_id, agent_name ) self.workflows[workflow_id]["agent_statuses"][agent_id] = "completed" self.workflows[workflow_id]["progress"] = 0.1 + (i + 1) * (0.8 / len(agents)) await asyncio.sleep(1) # Workflow abschließen self._add_log(workflow_id, "Alle Agenten haben ihre Aufgaben abgeschlossen", "info") self._add_log(workflow_id, "Workflow erfolgreich beendet", "success") self.workflows[workflow_id]["status"] = "completed" self.workflows[workflow_id]["progress"] = 1.0 self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat() # Speichere Ergebnisse in Datei für spätere Verwendung self._save_workflow_results(workflow_id) return workflow_id def _add_log( self, workflow_id: str, message: str, log_type: str, agent_id: Optional[str] = None, agent_name: Optional[str] = None ) -> None: """Fügt einen Protokolleintrag zum Workflow hinzu""" log_entry = { "id": f"log_{uuid.uuid4()}", "message": message, "type": log_type, "timestamp": datetime.now().isoformat(), "agent_id": agent_id, "agent_name": agent_name } workflow = self.workflows.get(workflow_id) if workflow: workflow["logs"].append(log_entry) logger.info(f"Workflow {workflow_id}: {message}") def _generate_simulated_result( self, workflow_id: str, agent: Dict[str, Any], index: int, prompt: str, files: List[Dict[str, Any]] ) -> Dict[str, Any]: """Generiert ein simuliertes Ergebnis basierend auf dem Agententyp""" agent_type = agent["type"] agent_id = agent["id"] agent_name = agent["name"] # Dateien als Teil des Kontexts einbinden file_names = [f["name"] for f in files] file_context = f"Basierend auf der Analyse von: {', '.join(file_names)}" result = { "id": f"result_{workflow_id}_{index}", "agent_id": agent_id, "agent_name": agent_name, "timestamp": datetime.now().isoformat(), "metadata": { "files_processed": file_names, "prompt": prompt } } if agent_type == "analyzer": result.update({ "title": "Datenanalyse-Ergebnis", "type": "text", "content": f"{file_context}\n\nDie Analyse der bereitgestellten Daten zeigt folgende Schlüsselerkenntnisse:\n\n1. Die Umsätze stiegen im Q1 2025 um 12% im Vergleich zum Vorjahr\n2. Produktkategorie A zeigt die stärkste Leistung mit 23% Wachstum\n3. In den Regionen Nord und West wurde das größte Wachstum beobachtet\n4. Die Betriebskosten sind um 5% gesunken, was zu einer verbesserten Marge führt" }) elif agent_type == "visualizer": result.update({ "title": "Umsatzentwicklung nach Quartal", "type": "chart", "content": "Diagramm der Umsatzentwicklung (simuliert)", "metadata": { **result["metadata"], "chart_type": "line", "chart_data": { "labels": ["Q1 2024", "Q2 2024", "Q3 2024", "Q4 2024", "Q1 2025"], "datasets": [ { "label": "Umsatz (Mio. €)", "data": [2.1, 2.4, 2.8, 3.2, 3.6] } ] } } }) elif agent_type == "writer": result.update({ "title": "Zusammenfassung und Empfehlungen", "type": "text", "content": f"{file_context}\n\n# Zusammenfassung Q1 2025\n\nDie Unternehmensdaten zeigen ein starkes erstes Quartal mit signifikanten Verbesserungen in allen Geschäftsbereichen. Der Gesamtumsatz stieg um 12%, während die Betriebskosten um 5% sanken, was zu einer verbesserten Gewinnmarge führte.\n\n## Empfehlungen\n\n1. **Investitionen in Produktkategorie A ausbauen** - Diese Kategorie zeigt das stärkste Wachstum und sollte weiter gefördert werden\n2. **Marketingaktivitäten in Süd- und Ostregionen verstärken** - Diese Regionen zeigen Potenzial für Wachstum\n3. **Kostenoptimierungen beibehalten** - Die Maßnahmen zur Kostensenkung haben sich als effektiv erwiesen\n\n## Prognose für Q2 2025\n\nBei gleichbleibenden Marktbedingungen erwarten wir ein weiteres Wachstum von 8-10% im Q2 2025." }) elif agent_type == "scraper": result.update({ "title": "Web-Scraping Ergebnisse", "type": "text", "content": f"# Marktdaten aus Web-Quellen\n\nBasierend auf der Analyse von 15 führenden Branchenwebsites wurden folgende Trends identifiziert:\n\n1. Der Gesamtmarkt wächst mit einer Rate von 8,5% jährlich\n2. Hauptwettbewerber A und B haben kürzlich neue Produktlinien gestartet\n3. Die durchschnittlichen Marktpreise sind in den letzten 6 Monaten um 3,2% gestiegen" }) else: result.update({ "title": "Verarbeitungsergebnis", "type": "text", "content": f"Allgemeines Ergebnis der Verarbeitung durch {agent_name}" }) return result def _save_workflow_results(self, workflow_id: str) -> None: """Speichert die Workflow-Ergebnisse in einer Datei""" workflow = self.workflows.get(workflow_id) if workflow: try: file_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json") with open(file_path, 'w', encoding='utf-8') as f: json.dump(workflow, f, indent=2, ensure_ascii=False) logger.info(f"Workflow-Ergebnisse gespeichert: {file_path}") except Exception as e: logger.error(f"Fehler beim Speichern der Workflow-Ergebnisse: {e}") def get_workflow_status(self, workflow_id: str) -> Optional[Dict[str, Any]]: """Gibt den Status eines Workflows zurück""" workflow = self.workflows.get(workflow_id) if not workflow: return None return { "id": workflow["id"], "status": workflow["status"], "progress": workflow["progress"], "started_at": workflow["started_at"], "completed_at": workflow["completed_at"], "agent_statuses": workflow["agent_statuses"] } def get_workflow_logs(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]: """Gibt die Protokolle eines Workflows zurück""" workflow = self.workflows.get(workflow_id) if not workflow: return None return workflow["logs"] def get_workflow_results(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]: """Gibt die Ergebnisse eines Workflows zurück""" workflow = self.workflows.get(workflow_id) if not workflow: return None return workflow["results"] # Singleton-Instanz des AgentService _agent_service_instance = None def get_agent_service(): """Gibt eine Singleton-Instanz des AgentService zurück""" global _agent_service_instance if _agent_service_instance is None: _agent_service_instance = AgentService() return _agent_service_instance