""" Manager für Workflow-Ausführung im Agentservice. Steuert den gesamten Ablauf eines Workflow-Durchlaufs. Implementiert die neue Workflow-Struktur und Ausführungslogik gemäß den Anforderungen. Unterstützt sowohl neue Workflows als auch die Fortsetzung bestehender Workflows mit Benutzereingaben. Angepasst für die verbesserte Dateibehandlung. """ import os import logging import asyncio import uuid import json from datetime import datetime from typing import List, Dict, Any, Optional, Tuple, Union # Import von Modulen from modules.agentservice_registry import AgentRegistry from modules.agentservice_filehandling import prepare_file_contexts, read_file_contents, extract_files_from_message, add_file_to_message # Import neuer Modellklassen try: from modules.lucydom_model import Message, Workflow, Document, DocumentSource, DocumentContent, DataStats except ImportError: # Fallback-Definitionen class Message(Dict[str, Any]): pass class Workflow(Dict[str, Any]): pass class Document(Dict[str, Any]): pass class DocumentSource(Dict[str, Any]): pass class DocumentContent(Dict[str, Any]): pass class DataStats(Dict[str, Any]): pass logger = logging.getLogger(__name__) registry = AgentRegistry.get_instance() agent = registry.get_agent("user_agent") class WorkflowError(Exception): """Basis-Exception für Workflow-Fehler""" pass class WorkflowNotFoundError(WorkflowError): """Exception wenn ein Workflow nicht gefunden wurde""" pass class WorkflowExecutionError(WorkflowError): """Exception bei der Ausführung eines Workflows""" pass class WorkflowManager: """Manager für die Ausführung von Workflows""" def __init__(self, mandate_id: int = None, user_id: int = None, ai_service = None, lucydom_interface = None): """ Initialisiert den WorkflowManager. Args: mandate_id: ID des Mandanten user_id: ID des Benutzers ai_service: Service für KI-Anfragen lucydom_interface: Interface für Datenbankzugriffe (optional) """ self.mandate_id = mandate_id self.user_id = user_id self.ai_service = ai_service self.lucydom_interface = lucydom_interface # Lade Konfiguration aus config.ini import configload config = configload.load_config() # Verzeichnisse für Ergebnisse und Uploads aus der Konfiguration lesen self.results_dir = config.get('Module_AgentserviceInterface', 'RESULTS_DIR', fallback='results') # Maximale Anzahl an Nachrichten im Verlauf self.max_history = int(config.get('Module_AgentserviceInterface', 'MAX_HISTORY', fallback='20')) # Stelle sicher, dass die Verzeichnisse existieren os.makedirs(self.results_dir, exist_ok=True) # Aktive Workflows self.workflows = {} # Lade aktive Workflows aus der Datenbank, falls verfügbar if self.lucydom_interface: self._load_active_workflows() logger.info(f"WorkflowManager initialisiert mit Mandant {mandate_id}, Benutzer {user_id}") async def execute_workflow( self, message: Dict[str, Any], workflow_id: Optional[str] = None, files: List[Dict[str, Any]] = None, is_user_input: bool = False # Parameter to identify user input ) -> Dict[str, Any]: """ Führt einen Workflow aus, entweder durch Erstellen eines neuen oder Fortsetzen eines bestehenden Workflows mit Benutzereingabe. Args: message: Die Nachricht (Prompt oder Benutzereingabe) workflow_id: Optional ID eines bestehenden Workflows files: Optionale Liste von Dateimetadaten is_user_input: Flag, das anzeigt, ob es sich um eine Benutzereingabe handelt Returns: Dictionary mit Workflow-Status und Ergebnis """ # Add detailed debug logging logger.info(f"execute_workflow called: workflow_id={workflow_id}, is_user_input={is_user_input}, message={message.get('content', '')[:50]}...") # Detailed file logging if files: logger.info(f"Files provided: {len(files)} files") for file in files: file_id = file.get('id', 'unknown') file_name = file.get('name', 'unnamed') file_type = file.get('type', 'unknown') file_content_type = file.get('content_type', 'unknown') logger.info(f"File: {file_name} (ID: {file_id}, Type: {file_type}, Content-Type: {file_content_type})") else: logger.info("No files provided with the message") # 4.1 Unterscheide zwischen neuem Workflow und bestehender Benutzereingabe is_new_workflow = workflow_id is None if is_new_workflow: # Variante (A): Neuen Workflow erstellen workflow_id = f"wf_{uuid.uuid4()}" workflow = self._initialize_workflow(workflow_id) workflow["name"] = message.get("content", "")[:50] # Kurzer Titel aus dem Inhalt workflow["status"] = "running" self._add_log(workflow, "Neuer Workflow gestartet", "info") else: # Variante (B): Bestehenden Workflow laden try: workflow = await self.load_workflow(workflow_id) if not workflow: raise WorkflowNotFoundError(f"Workflow {workflow_id} nicht gefunden") # Wenn der Workflow noch läuft, beende ihn ordnungsgemäß if workflow["status"] == "running": self._add_log(workflow, f"Neuer Benutzereingabe erhalten - Vorherigen Workflow beenden", "info") # Status auf completed setzen, damit er für den neuen Input bereit ist workflow["status"] = "completed" self._add_log(workflow, f"Starte Workflow-Ausführung, Nachrichtenlänge: {len(message.get('content', ''))}, {len(files or [])} Dateien", "info") # Status auf "running" setzen workflow["status"] = "running" workflow["last_activity"] = datetime.now().isoformat() self._add_log(workflow, "Workflow nach Benutzereingabe fortgesetzt", "info") except WorkflowNotFoundError as e: logger.error(f"Workflow nicht gefunden: {str(e)}") return { "workflow_id": workflow_id, "status": "error", "error": f"Workflow nicht gefunden: {workflow_id}" } except WorkflowError as e: logger.error(f"Workflow-Fehler: {str(e)}") return { "workflow_id": workflow_id, "status": "error", "error": str(e) } logger.debug(f"Workflow initialisiert: {workflow_id}, Status: {workflow['status']}") try: # 4.2 Message-Initialisierung # Letztes Message-Objekt abschließen (falls vorhanden) if "messages" in workflow and workflow["messages"]: self._finalize_last_message(workflow) # Neues Message-Objekt erstellen new_message = self._create_message(workflow_id, message.get("role", "user")) new_message["content"] = message.get("content", "") # Log the message creation logger.info(f"Created new message with ID {new_message['id']} and content: {new_message['content'][:50]}...") # 4.3 Dateivorbereitung if files and len(files) > 0: # Add detailed logging logger.info(f"Processing {len(files)} files for message {new_message['id']}") for f in files: logger.info(f"Processing file: {f.get('name', 'unknown')} (ID: {f.get('id', 'unknown')})") # Dateikontexte vorbereiten - enthält nur Metadaten file_contexts = prepare_file_contexts(files) self._add_log(workflow, f"{len(files)} Dateien werden verarbeitet", "info") # Dateiinhalte lesen und zum Message-Objekt hinzufügen # LucyDOM-Interface wird für Dateizugriffe genutzt file_contents = await read_file_contents( file_contexts, self.lucydom_interface, workflow_id, self._add_log, self.ai_service ) logger.debug(f"Dateien geladen für Workflow {workflow_id}: {file_contents.keys()}") for file_id, content in file_contents.items(): file_metadata = next((f for f in files if f.get('id') == file_id), {}) file_data = { "id": file_id, "name": file_metadata.get('name', next((f.get('name', 'unnamed_file') for f in file_contexts if f.get('id') == file_id), 'unnamed_file')), "content_type": file_metadata.get('content_type', next((f.get('content_type') for f in file_contexts if f.get('id') == file_id), None)), "type": file_metadata.get('type', next((f.get('type') for f in file_contexts if f.get('id') == file_id), "unknown")), "content": content, "size": file_metadata.get('size') } logger.info(f"Adding file {file_data['name']} (ID: {file_id}) to message {new_message['id']}") try: # Add file to message and check document count before and after doc_count_before = len(new_message.get("documents", [])) new_message = add_file_to_message(new_message, file_data) doc_count_after = len(new_message.get("documents", [])) if doc_count_after > doc_count_before: logger.info(f"File successfully added to message. Document count: {doc_count_after}") else: logger.warning(f"File may not have been added to message properly. Document count unchanged: {doc_count_before}") except Exception as e: logger.error(f"Error adding file to message: {str(e)}") self._add_log(workflow, f"Fehler beim Hinzufügen der Datei {file_data['name']}: {str(e)}", "error") # Message zum Workflow hinzufügen if "messages" not in workflow: workflow["messages"] = [] # Log the message document count before adding to workflow logger.info(f"Adding message with {len(new_message.get('documents', []))} documents to workflow {workflow_id}") workflow["messages"].append(new_message) # Immediately save workflow to persist file attachments self._save_workflow(workflow) logger.info(f"Saved workflow state after adding message with {len(new_message.get('documents', []))} documents") # 4.4 Agent-Initialisierung agents = registry.initialize_agents_for_workflow() # 4.5 Moderator-Entscheidung (mit OpenAI API) agent_tasks = await self._decide_agent_tasks(new_message, agents) # Speichere den aktuellen Zwischenstand self._save_workflow(workflow) # Nach Agenten-Entscheidung self._add_log(workflow, f"Agenten-Entscheidung abgeschlossen: {len(agent_tasks)} Aufgaben zugewiesen", "info") logger.debug(f"Agent-Tasks für Workflow {workflow_id}: {[task['agent_id'] for task in agent_tasks]}") for task in agent_tasks: self._add_log(workflow, f"Agent {task['agent_id']} wurde ausgewählt mit Aufgabe: {task['prompt'][:50]}...", "info") # 4.6 Agent-Ausführung # Filtern der Aufgaben nach Typ system_agent_tasks = [task for task in agent_tasks if task["agent_id"] != "user_agent"] user_agent_task = next((task for task in agent_tasks if task["agent_id"] == "user_agent"), None) # 1. System-Agenten ausführen, falls vorhanden agent_results = [] last_result = None if system_agent_tasks: self._add_log(workflow, f"{len(system_agent_tasks)} System-Agenten werden ausgeführt", "info") for task in system_agent_tasks: agent_id = task["agent_id"] agent_prompt = task["prompt"] # Wenn ein vorheriges Ergebnis existiert, in den Prompt einbinden if last_result: agent_prompt = f"{agent_prompt}\n\nVorheriges Ergebnis: {last_result}" self._add_log(workflow, f"Agent {agent_id} wird ausgeführt", "info") # Agenten ausführen agent_result = await self._execute_agent(workflow, agent_id, agent_prompt) if agent_result: agent_results.append(agent_result) last_result = agent_result.get("content", "") # 2. Immer den User-Agent aufrufen, entweder mit dem Moderator-Prompt oder einem generischen # Prompt basierend auf den Ergebnissen der System-Agenten if user_agent_task: # Verwende den spezifischen Prompt vom Moderator user_prompt = user_agent_task["prompt"] else: # Erstelle einen generischen Prompt basierend auf den System-Agent-Ergebnissen if agent_results: # Wenn System-Agenten ausgeführt wurden, fasse ihre Ergebnisse zusammen summary = await self._create_summary(agent_results) user_prompt = f"Die Agenten haben ihre Aufgaben abgeschlossen. Hier ist eine Zusammenfassung der Ergebnisse:\n\n{summary}\n\nBenötigen Sie weitere Informationen oder haben Sie Fragen dazu?" else: # Wenn keine System-Agenten ausgeführt wurden user_prompt = "Die Agenten haben keine spezifischen Aufgaben durchgeführt. Gibt es etwas, wobei ich Ihnen helfen kann?" # 3. User-Agent-Nachricht erstellen und zum Workflow hinzufügen workflow["status"] = "completed" # Workflow is complete, ready for new prompt user_message = { "role": "assistant", "content": f"[Moderator zu User Agent] {user_prompt}", "agent_type": "moderator", "agent_id": "moderator", "agent_name": "Moderator", "workflow_complete": True # Signal completion instead of waiting } # Nachricht zum Workflow hinzufügen workflow["messages"].append(user_message) # Log-Eintrag self._add_log(workflow, f"Workflow wartet auf Benutzereingabe: {user_prompt[:50]}...", "info") # Workflow speichern self._save_workflow(workflow) # Fertig - Backend wartet jetzt auf nächsten API-Call vom Frontend return { "workflow_id": workflow_id, "status": "completed", "messages": workflow.get("messages", []) } except Exception as e: # Fehlerbehandlung workflow["status"] = "failed" self._add_log(workflow, f"Fehler bei der Workflow-Ausführung: {str(e)}", "error") self._save_workflow(workflow) logger.error(f"Fehler bei der Workflow-Ausführung: {str(e)}", exc_info=True) return { "workflow_id": workflow_id, "status": "failed", "error": str(e) } def _load_active_workflows(self): """Lädt aktive Workflows aus der Datenbank""" try: if not self.lucydom_interface: return # Aktive Workflows für den aktuellen Benutzer abrufen user_workflows = self.lucydom_interface.get_workflows_by_user(self.user_id) active_workflows = [wf for wf in user_workflows if wf.get("status") in ["running", "completed"]] # Aktive Workflows in den Speicher laden for workflow_base in active_workflows: workflow_id = workflow_base.get("id") if not workflow_id: continue # Vollständigen Workflow-Zustand laden workflow = self.lucydom_interface.load_workflow_state(workflow_id) if workflow: self.workflows[workflow_id] = workflow logger.info(f"Aktiven Workflow {workflow_id} aus Datenbank geladen") except Exception as e: logger.error(f"Fehler beim Laden der aktiven Workflows: {str(e)}") def _save_workflow(self, workflow: Dict[str, Any]) -> None: """ Speichert den Workflow in der Datenbank und als Datei. Args: workflow: Das zu speichernde Workflow-Objekt """ workflow_id = workflow.get("id") # In der Datenbank speichern, falls verfügbar if self.lucydom_interface: try: success = self.lucydom_interface.save_workflow_state(workflow) if success: logger.debug(f"Workflow {workflow_id} in Datenbank gespeichert") else: logger.warning(f"Workflow {workflow_id} konnte nicht in Datenbank gespeichert werden") except Exception as e: logger.error(f"Fehler beim Speichern des Workflows {workflow_id} in Datenbank: {str(e)}") # Als Datei speichern (Backup/Fallback) workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json") try: with open(workflow_path, 'w', encoding='utf-8') as f: json.dump(workflow, f, indent=2, ensure_ascii=False) logger.debug(f"Workflow {workflow_id} als Datei gespeichert: {workflow_path}") except Exception as e: logger.error(f"Fehler beim Speichern des Workflows {workflow_id} als Datei: {str(e)}") async def load_workflow(self, workflow_id: str) -> Optional[Dict[str, Any]]: """ Lädt einen Workflow aus der Datenbank oder Datei. Args: workflow_id: ID des Workflows Returns: Das geladene Workflow-Objekt oder None, wenn der Workflow nicht existiert """ # Prüfen, ob der Workflow bereits im Speicher ist if workflow_id in self.workflows: return self.workflows[workflow_id] # Versuche, den Workflow aus der Datenbank zu laden if self.lucydom_interface: try: workflow = self.lucydom_interface.load_workflow_state(workflow_id) if workflow: # Workflow im Speicher cachen self.workflows[workflow_id] = workflow logger.info(f"Workflow {workflow_id} aus Datenbank geladen") return workflow except Exception as e: logger.error(f"Fehler beim Laden des Workflows {workflow_id} aus Datenbank: {str(e)}") # Versuche, den Workflow aus der Datei zu laden workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json") try: if os.path.exists(workflow_path): with open(workflow_path, 'r', encoding='utf-8') as f: workflow = json.load(f) # Workflow im Speicher cachen self.workflows[workflow_id] = workflow # Optional: In Datenbank speichern, falls verfügbar if self.lucydom_interface: try: self.lucydom_interface.save_workflow_state(workflow) logger.info(f"Workflow {workflow_id} in Datenbank gespeichert nach Laden aus Datei") except Exception as e: logger.warning(f"Fehler beim Speichern des Workflows {workflow_id} in Datenbank nach Laden aus Datei: {str(e)}") logger.info(f"Workflow {workflow_id} aus Datei geladen: {workflow_path}") return workflow else: logger.warning(f"Workflow {workflow_id} nicht gefunden: {workflow_path}") raise WorkflowNotFoundError(f"Workflow {workflow_id} nicht gefunden") except WorkflowNotFoundError: raise except Exception as e: logger.error(f"Fehler beim Laden des Workflows {workflow_id} aus Datei: {str(e)}") raise WorkflowError(f"Fehler beim Laden des Workflows: {str(e)}") async def list_workflows(self, mandate_id: int = None, user_id: int = None) -> List[Dict[str, Any]]: """ Listet alle verfügbaren Workflows auf. Args: mandate_id: Optionale Mandanten-ID für die Filterung user_id: Optionale Benutzer-ID für die Filterung Returns: Liste von Workflow-Zusammenfassungen """ workflows = [] # Aus Datenbank laden, falls verfügbar if self.lucydom_interface: try: # Alle Workflows des Benutzers abrufen if user_id is not None: user_workflows = self.lucydom_interface.get_workflows_by_user(user_id) else: user_workflows = self.lucydom_interface.get_all_workflows() # Nach Mandanten filtern, falls angegeben if mandate_id is not None: user_workflows = [wf for wf in user_workflows if wf.get("mandate_id") == mandate_id] # Workflow-Zusammenfassungen erstellen for workflow in user_workflows: summary = { "id": workflow.get("id"), "name": workflow.get("name", f"Workflow {workflow.get('id')}"), "status": workflow.get("status"), "started_at": workflow.get("started_at"), "last_activity": workflow.get("last_activity"), "completed_at": workflow.get("completed_at") } # Nachrichtenanzahl hinzufügen, falls verfügbar messages = self.lucydom_interface.get_workflow_messages(workflow.get("id")) if messages: summary["message_count"] = len(messages) workflows.append(summary) logger.info(f"Workflows aus Datenbank geladen: {len(workflows)}") # Nach letzter Aktivität sortieren (neueste zuerst) return sorted(workflows, key=lambda w: w.get("last_activity", ""), reverse=True) except Exception as e: logger.error(f"Fehler beim Abrufen der Workflows aus Datenbank: {str(e)}") # Aus Dateien laden, wenn keine Datenbank verfügbar oder ein Fehler aufgetreten ist try: for filename in os.listdir(self.results_dir): if filename.startswith("workflow_") and filename.endswith(".json"): workflow_path = os.path.join(self.results_dir, filename) try: with open(workflow_path, 'r', encoding='utf-8') as f: workflow = json.load(f) # Prüfen, ob Mandanten- und Benutzer-ID übereinstimmen if mandate_id is not None and workflow.get("mandate_id") != mandate_id: continue if user_id is not None and workflow.get("user_id") != user_id: continue # Workflow-Zusammenfassung erstellen summary = { "id": workflow.get("id"), "name": workflow.get("name", f"Workflow {workflow.get('id')}"), "status": workflow.get("status"), "started_at": workflow.get("started_at"), "last_activity": workflow.get("last_activity"), "message_count": len(workflow.get("messages", [])) } workflows.append(summary) except Exception as e: logger.error(f"Fehler beim Laden der Workflow-Datei {filename}: {str(e)}") logger.info(f"Workflows aus Dateien geladen: {len(workflows)}") # Nach letzter Aktivität sortieren (neueste zuerst) return sorted(workflows, key=lambda w: w.get("last_activity", ""), reverse=True) except Exception as e: logger.error(f"Fehler beim Auflisten der Workflows: {str(e)}") return [] async def delete_workflow(self, workflow_id: str) -> bool: """ Löscht einen Workflow. Args: workflow_id: ID des Workflows Returns: True bei Erfolg, False wenn der Workflow nicht existiert """ # Aus dem Speicher entfernen if workflow_id in self.workflows: del self.workflows[workflow_id] # Aus der Datenbank löschen if self.lucydom_interface: try: db_success = self.lucydom_interface.delete_workflow(workflow_id) logger.info(f"Workflow {workflow_id} aus Datenbank gelöscht: {db_success}") except Exception as e: logger.error(f"Fehler beim Löschen des Workflows {workflow_id} aus Datenbank: {str(e)}") # Datei löschen workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json") try: if os.path.exists(workflow_path): os.remove(workflow_path) logger.info(f"Workflow {workflow_id} aus Datei gelöscht: {workflow_path}") return True else: logger.warning(f"Workflow {workflow_id} nicht gefunden: {workflow_path}") return False except Exception as e: logger.error(f"Fehler beim Löschen der Workflow-Datei {workflow_id}: {str(e)}") return False def _initialize_workflow(self, workflow_id: str) -> Dict[str, Any]: """ Initialisiert einen neuen Workflow und speichert ihn in der Datenbank. Args: workflow_id: ID des Workflows Returns: Das initialisierte Workflow-Objekt """ current_time = datetime.now().isoformat() # Vollständiges Workflow-Objekt gemäß dem Datenmodell erstellen workflow = { "id": workflow_id, "name": f"Workflow {workflow_id}", "mandate_id": self.mandate_id, "user_id": self.user_id, "status": "running", "started_at": current_time, "last_activity": current_time, "current_round": 1, # Vollständige Statistik-Struktur gemäß DataStats-Modell "data_stats": { "total_processing_time": 0.0, "total_token_count": 0, "total_bytes_sent": 0, "total_bytes_received": 0 }, # Leere Arrays für Nachrichten und Logs "messages": [], "logs": [] } print("DEBUG Init workflow") # Log-Eintrag für den Start des Workflows self._add_log(workflow, "Workflow gestartet", "info") # Workflow in Datenbank speichern if self.lucydom_interface: try: # Direktes Speichern des vollständigen Workflow-Objekts self.lucydom_interface.save_workflow_state(workflow) logger.info(f"Workflow {workflow_id} in Datenbank erstellt") except Exception as e: logger.error(f"Fehler beim Erstellen des Workflows {workflow_id} in Datenbank: {str(e)}") # Workflow im Speicher cachen self.workflows[workflow_id] = workflow return workflow async def stop_workflow(self, workflow_id: str) -> bool: """ Stoppt einen laufenden Workflow. Args: workflow_id: ID des zu stoppenden Workflows Returns: True bei Erfolg, False wenn der Workflow nicht existiert oder bereits beendet wurde """ try: workflow = self.workflows.get(workflow_id) if not workflow: # Versuche den Workflow zu laden workflow = await self.load_workflow(workflow_id) if not workflow: return False # Wenn der Workflow nicht im Status 'running' oder 'completed' ist, beenden if workflow.get("status") not in ["running", "completed"]: return False # Status auf 'stopped' setzen workflow["status"] = "stopped" workflow["last_activity"] = datetime.now().isoformat() self._add_log(workflow, "Workflow wurde manuell gestoppt", "info") # Workflow speichern self._save_workflow(workflow) return True except Exception as e: logger.error(f"Fehler beim Stoppen des Workflows {workflow_id}: {str(e)}") return False async def _decide_agent_tasks(self, message: Dict[str, Any], agents: Dict[str, Dict[str, Any]]) -> List[Dict[str, Any]]: """ Entscheidet anhand der Nachricht und Agentenprofile, welche Agenten für welche Aufgaben eingesetzt werden sollen. Args: message: Das zu verarbeitende Message-Objekt agents: Verfügbare Agenten mit ihren Profilen Returns: Liste mit Aufgaben für Agenten (agent_id, prompt) """ try: # Erstelle einen Prompt für den OpenAI-Call agent_descriptions = [] for agent_id, agent in agents.items(): agent_descriptions.append(f"ID: {agent_id}, Name: {agent['name']}, Typ: {agent['type']}, Beschreibung: {agent['description']}, Fähigkeiten: {agent['capabilities']}") agent_description_text = "\n".join(agent_descriptions) # Nachrichteninhalt extrahieren content = message.get("content", "") # Dateien aus der Nachricht extrahieren files = extract_files_from_message(message) file_descriptions = [] for file in files: file_descriptions.append(f"Name: {file.get('name', '')}, Typ: {file.get('content_type', '')}, Inhalt: {file.get('content', '')[:200]}...") file_description_text = "\n".join(file_descriptions) if file_descriptions else "Keine Dateien" # Prompt für den OpenAI-Call erstellen decision_prompt = f""" Du bist ein Workflow-Manager, der entscheidet, welche Agenten für eine Anfrage eingesetzt werden sollen. VERFÜGBARE AGENTEN: {agent_description_text} BENUTZERANFRAGE: {content} DATEIEN: {file_description_text} ANWEISUNGEN: 1. Analysiere die Benutzeranfrage und die Dateien 2. Entscheide, welche Agenten eingesetzt werden sollen 3. Die Agenten müssen entweder NUR der User-Agent ODER NUR System-Agenten sein 4. Definiere für jeden Agenten einen spezifischen Prompt basierend auf seinen Fähigkeiten 5. Gib das Ergebnis als JSON-Array von Objekten zurück Antwortformat: [ {{"agent_id": "agent_id_1", "prompt": "Aufgabenbeschreibung für Agent 1"}}, {{"agent_id": "agent_id_2", "prompt": "Aufgabenbeschreibung für Agent 2"}} ] Wenn keine Agenten erforderlich sind (z.B. bei unklaren Anfragen), gib ein leeres Array zurück: [] """ # OpenAI-Call durchführen content = await self.ai_service.call_api([{"role": "user", "content": decision_prompt}]) # Versuche, JSON zu parsen import json import re # Suche nach JSON-Objekten in der Antwort json_match = re.search(r'\[\s*{.*}\s*\]', content, re.DOTALL) if json_match: json_str = json_match.group(0) agent_tasks = json.loads(json_str) # Validiere die Struktur for task in agent_tasks: if "agent_id" not in task or "prompt" not in task: raise ValueError("Ungültiges Format: Jede Aufgabe muss agent_id und prompt enthalten") # Prüfe, ob nur User oder nur System-Agenten ausgewählt wurden agent_types = [agents[task["agent_id"]]["type"] for task in agent_tasks if task["agent_id"] in agents] if "user" in agent_types and len(agent_types) > 1: # Wenn der User-Agent und andere Agenten ausgewählt wurden, nur den User-Agenten behalten agent_tasks = [task for task in agent_tasks if agents[task["agent_id"]]["type"] == "user"] logger.debug(f"Ausgewählte Agenten-Tasks: {agent_tasks}") return agent_tasks else: # Kein JSON gefunden, leere Liste zurückgeben return [] except Exception as e: logger.error(f"Fehler bei der Agent-Auswahl: {str(e)}", exc_info=True) return [] async def _execute_agent(self, workflow: Dict[str, Any], agent_id: str, prompt: str) -> Optional[Dict[str, Any]]: """ Führt einen Agenten mit einem spezifischen Prompt aus. Args: workflow: Das Workflow-Objekt agent_id: ID des auszuführenden Agenten prompt: Prompt für den Agenten Returns: Das Ergebnis des Agenten oder None bei Fehlern """ try: # Agenten-Instanz holen registry = AgentRegistry.get_instance() agent = registry.get_agent(agent_id) if not agent: self._add_log(workflow, f"Agent '{agent_id}' nicht gefunden", "error") return None # Message-Objekt für den Agenten erstellen agent_message = { "role": "user", "content": prompt, "workflow_id": workflow["id"] } # Agenten ausführen self._add_log(workflow, f"Agent '{agent_id}' wird ausgeführt", "info") result = await agent.process_message(agent_message, workflow) # Agenten-Antwort als neue Nachricht zum Workflow hinzufügen agent_response_message = self._create_message(workflow["id"], "assistant") agent_response_message["content"] = result.get("content", "") agent_response_message["agent_type"] = agent.type agent_response_message["agent_id"] = agent_id agent_response_message["agent_name"] = agent.name # Nachricht zum Workflow hinzufügen workflow["messages"].append(agent_response_message) # Nachricht abschließen und in der Datenbank speichern self._finalize_last_message(workflow) # Workflow-Zustand speichern self._save_workflow(workflow) # Ergebnis formatieren und zurückgeben agent_result = { "agent_id": agent_id, "agent_name": agent.name, "content": result.get("content", ""), "agent_type": agent.type } self._add_log(workflow, f"Agent '{agent_id}' hat geantwortet", "info") return agent_result except Exception as e: self._add_log(workflow, f"Fehler bei der Ausführung von Agent '{agent_id}': {str(e)}", "error") return None async def _create_summary(self, agent_results: List[Dict[str, Any]]) -> str: """ Erstellt eine Zusammenfassung der Agentenergebnisse. Args: agent_results: Liste der Agentenergebnisse Returns: Zusammenfassung als Text """ if not agent_results: return "Keine Agentenergebnisse verfügbar." # Kombiniere die Ergebnisse in einen Kontext context = "" for result in agent_results: agent_name = result.get("agent_name", "Unbekannter Agent") content = result.get("content", "") context += f"--- {agent_name} ---\n{content}\n\n" # Prompt für die Zusammenfassung summary_prompt = f""" Erstelle eine aussagekräftige Zusammenfassung der folgenden Agentenergebnisse. Organisiere die Informationen strukturiert und vermeide Redundanzen. Behalte alle wichtigen Erkenntnisse und Empfehlungen bei. {context} """ # OpenAI-Call für die Zusammenfassung try: summary = await self.ai_service.call_api([{"role": "user", "content": summary_prompt}]) return summary except Exception as e: logger.error(f"Fehler bei der Erstellung der Zusammenfassung: {str(e)}") return "Fehler bei der Erstellung der Zusammenfassung. Bitte die individuellen Agentenergebnisse beachten." def _add_log(self, workflow: Dict[str, Any], message: str, log_type: str, agent_id: Optional[str] = None, agent_name: Optional[str] = None) -> None: """ Fügt einen Log-Eintrag zum Workflow hinzu und speichert ihn in der Datenbank. """ # First, check if workflow is a string (ID) instead of dictionary if isinstance(workflow, str): # Try to load the workflow by ID workflow_id = workflow workflow = self.workflows.get(workflow_id) if not workflow: # Just log to the logger and return logger.info(f"Log (couldn't add to workflow {workflow_id}): {log_type} - {message}") return # Check if workflow is a dictionary if not isinstance(workflow, dict): logger.error(f"Invalid workflow type: {type(workflow)}. Expected dictionary.") # Just log to the logger and return logger.info(f"Log (couldn't add to workflow): {log_type} - {message}") return # Continue with the rest of the function if workflow is a dictionary log_entry = { "id": f"log_{uuid.uuid4()}", "message": message, "type": log_type, "timestamp": datetime.now().isoformat(), "agent_id": agent_id, "agent_name": agent_name } # Log-Eintrag zum Workflow hinzufügen if "logs" not in workflow: workflow["logs"] = [] workflow["logs"].append(log_entry) # Letzte Aktivität aktualisieren workflow["last_activity"] = log_entry["timestamp"] # Log-Eintrag in Datenbank speichern, falls verfügbar if self.lucydom_interface: try: # Workflow-ID zum Log-Eintrag hinzufügen log_data = log_entry.copy() log_data["workflow_id"] = workflow["id"] self.lucydom_interface.create_workflow_log(log_data) logger.debug(f"Log-Eintrag für Workflow {workflow['id']} in Datenbank gespeichert") except Exception as e: logger.error(f"Fehler beim Speichern des Log-Eintrags für Workflow {workflow['id']} in Datenbank: {str(e)}") logger.info(f"Workflow {workflow['id']}: {message}") def _create_message(self, workflow_id: str, role: str = "system", parent_message_id: str = None) -> Dict[str, Any]: """ Erstellt ein neues Message-Objekt und speichert es in der Datenbank. Args: workflow_id: ID des Workflows role: Rolle der Nachricht ('system', 'user', 'assistant') parent_message_id: ID der Elternnachricht (optional) Returns: Das erstellte Message-Objekt """ workflow = self.workflows.get(workflow_id) # Sequence-Nummer bestimmen sequence_no = 1 if workflow and workflow.get("messages"): sequence_no = len(workflow["messages"]) + 1 # Aktuelle Zeit current_time = datetime.now().isoformat() # Ensure a unique ID for the message message_id = f"msg_{uuid.uuid4()}" # Message-Objekt erstellen message = { "id": message_id, "workflow_id": workflow_id, "parent_message_id": parent_message_id, "started_at": current_time, "finished_at": None, "sequence_no": sequence_no, "status": "pending", "role": role, "data_stats": { "processing_time": 0.0, "token_count": 0, "bytes_sent": 0, "bytes_received": 0 }, "documents": [], # Initialize empty documents array "content": None, "agent_type": None } # In Datenbank speichern, falls verfügbar if self.lucydom_interface: try: # Include all fields in the database version message_data = { "id": message_id, "workflow_id": workflow_id, "sequence_no": sequence_no, "role": role, "content": None, "agent_type": None, "created_at": current_time, # IMPORTANT: Include documents field "documents": [] } # Log the message creation logger.debug(f"Creating new message in database: {message_data}") result = self.lucydom_interface.create_workflow_message(message_data) if result: logger.debug(f"Nachricht für Workflow {workflow_id} in Datenbank erstellt mit ID: {message_id}") else: logger.warning(f"Fehler beim Erstellen der Nachricht für Workflow {workflow_id} in Datenbank") except Exception as e: logger.error(f"Fehler beim Erstellen der Nachricht für Workflow {workflow_id} in Datenbank: {str(e)}") return message def _finalize_last_message(self, workflow: Dict[str, Any]) -> None: """ Schließt die letzte Nachricht im Workflow ab und aktualisiert sie in der Datenbank. Args: workflow: Das Workflow-Objekt """ if not workflow.get("messages"): return last_message = workflow["messages"][-1] if last_message.get("finished_at") is None: last_message["finished_at"] = datetime.now().isoformat() last_message["status"] = "completed" # In Datenbank aktualisieren, falls verfügbar if self.lucydom_interface: try: message_id = last_message.get("id") if not message_id: logger.warning(f"Keine ID für letzte Nachricht in Workflow {workflow['id']} gefunden") return # Only extract fields that are expected in the database model # Make sure all required fields have values with proper defaults message_data = { "id": message_id, "workflow_id": workflow.get("id", ""), "sequence_no": last_message.get("sequence_no", 0), "role": last_message.get("role", "unknown"), "content": last_message.get("content", ""), "agent_type": last_message.get("agent_type", ""), "created_at": last_message.get("started_at", datetime.now().isoformat()), # IMPORTANT: Include the documents array "documents": last_message.get("documents", []) } # Log the message data for debugging logger.debug(f"Updating message in database with data: {message_data}") # Nachricht in Datenbank aktualisieren self.lucydom_interface.update_workflow_message(message_id, message_data) logger.debug(f"Nachricht {message_id} für Workflow {workflow['id']} in Datenbank aktualisiert (mit Dokumenten)") except Exception as e: logger.error(f"Fehler beim Aktualisieren der Nachricht für Workflow {workflow['id']} in Datenbank: {str(e)}") def get_workflow_status(self, workflow_id: str) -> Optional[Dict[str, Any]]: """ Gibt den Status eines Workflows zurück. Args: workflow_id: ID des Workflows Returns: Dictionary mit Status-Informationen oder None, wenn der Workflow nicht existiert """ # Aus dem Speicher abrufen workflow = self.workflows.get(workflow_id) # Falls nicht im Speicher, aus der Datenbank oder Datei laden if not workflow: # Aus Datenbank laden, falls verfügbar if self.lucydom_interface: try: workflow_data = self.lucydom_interface.get_workflow(workflow_id) if workflow_data: workflow = workflow_data except Exception as e: logger.error(f"Fehler beim Laden des Workflow-Status aus Datenbank: {str(e)}") # Falls nicht in der Datenbank, aus Datei laden if not workflow: try: workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json") if os.path.exists(workflow_path): with open(workflow_path, 'r', encoding='utf-8') as f: workflow = json.load(f) except Exception as e: logger.error(f"Fehler beim Laden des Workflow-Status aus Datei: {str(e)}") return None if not workflow: return None # Status-Informationen extrahieren status_info = { "id": workflow.get("id"), "name": workflow.get("name", f"Workflow {workflow_id}"), "status": workflow.get("status"), "progress": 1.0 if workflow.get("status") in ["completed", "failed", "stopped"] else 0.5, "started_at": workflow.get("started_at"), "last_activity": workflow.get("last_activity"), "workflow_complete": workflow.get("status") == "completed", # Add this instead "current_round": workflow.get("current_round", 1), "data_stats": workflow.get("data_stats", { "total_processing_time": 0.0, "total_token_count": 0, "total_bytes_sent": 0, "total_bytes_received": 0 }) } return status_info def get_workflow_logs(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]: """ Gibt die Logs eines Workflows zurück. Args: workflow_id: ID des Workflows Returns: Liste der Logs oder None, wenn der Workflow nicht existiert """ # Aus dem Speicher abrufen workflow = self.workflows.get(workflow_id) # Falls nicht im Speicher, aus der Datenbank laden if not workflow and self.lucydom_interface: try: logs = self.lucydom_interface.get_workflow_logs(workflow_id) return logs except Exception as e: logger.error(f"Fehler beim Laden der Workflow-Logs aus Datenbank: {str(e)}") # Falls nicht in der Datenbank oder kein Interface verfügbar, aus Datei laden if not workflow: try: workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json") if os.path.exists(workflow_path): with open(workflow_path, 'r', encoding='utf-8') as f: workflow = json.load(f) except Exception as e: logger.error(f"Fehler beim Laden der Workflow-Logs aus Datei: {str(e)}") return None return workflow.get("logs", []) if workflow else None def get_workflow_messages(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]: """ Gibt die Nachrichten eines Workflows zurück. Args: workflow_id: ID des Workflows Returns: Liste der Nachrichten oder None, wenn der Workflow nicht existiert """ # Aus dem Speicher abrufen workflow = self.workflows.get(workflow_id) # Falls nicht im Speicher, aus der Datenbank laden if not workflow and self.lucydom_interface: try: messages = self.lucydom_interface.get_workflow_messages(workflow_id) return messages except Exception as e: logger.error(f"Fehler beim Laden der Workflow-Nachrichten aus Datenbank: {str(e)}") # Falls nicht in der Datenbank oder kein Interface verfügbar, aus Datei laden if not workflow: try: workflow_path = os.path.join(self.results_dir, f"workflow_{workflow_id}.json") if os.path.exists(workflow_path): with open(workflow_path, 'r', encoding='utf-8') as f: workflow = json.load(f) except Exception as e: logger.error(f"Fehler beim Laden der Workflow-Nachrichten aus Datei: {str(e)}") return None return workflow.get("messages", []) if workflow else None # Anpassen der Factory-Funktion für den WorkflowManager def get_workflow_manager(mandate_id: int = None, user_id: int = None, ai_service = None): """ Gibt eine WorkflowManager-Instanz für den angegebenen Kontext zurück. Wiederverwendet bestehende Instanzen. Args: mandate_id: ID des Mandanten user_id: ID des Benutzers ai_service: Service für KI-Anfragen Returns: Eine WorkflowManager-Instanz """ from modules.lucydom_interface import get_lucydom_interface context_key = f"{mandate_id}_{user_id}" # LucyDOM-Interface für Datenbankzugriffe lucydom_interface = get_lucydom_interface(mandate_id, user_id) if context_key not in _workflow_managers: _workflow_managers[context_key] = WorkflowManager( mandate_id, user_id, ai_service, lucydom_interface ) # Aktualisiere die Services, falls sie geändert wurden if ai_service is not None: _workflow_managers[context_key].ai_service = ai_service return _workflow_managers[context_key] # Singleton-Factory für WorkflowManager-Instanzen pro Kontext _workflow_managers = {}