""" Registry für alle verfügbaren Agenten im System. Verwaltet die Agenten-Instanzen und stellt sie für den Workflow zur Verfügung. """ import logging import importlib from typing import Dict, Any, List, Optional # Import direkt bekannter Agent-Module # Andere Module werden dynamisch importiert from modules.agentservice_base import BaseAgent logger = logging.getLogger(__name__) class AgentRegistry: """Registry für alle verfügbaren Agenten im System""" _instance = None @classmethod def get_instance(cls): """Gibt eine Singleton-Instanz der Agent-Registry zurück""" if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): """Initialisiert die Agent-Registry""" if AgentRegistry._instance is not None: raise RuntimeError("Singleton-Instanz existiert bereits - nutze get_instance()") self.agents = {} self._load_agents() def _load_agents(self): """Lädt alle verfügbaren Agenten""" # Liste aller zu ladenden Agenten-Module agent_modules = [ "agentservice_agent_user", "agentservice_agent_coder", "agentservice_agent_analyst", "agentservice_agent_webcrawler", "agentservice_agent_sharepoint", "agentservice_agent_documentation" ] for module_name in agent_modules: try: # Importiere das Modul try: module = importlib.import_module(f"modules.{module_name}") except ImportError: module = importlib.import_module(module_name) # Suche nach der Agent-Klasse statt nach getter-Funktion agent_type = module_name.split('_')[-1] class_name = f"{agent_type.capitalize()}Agent" if hasattr(module, class_name): # Instanziiere den Agenten direkt agent_class = getattr(module, class_name) agent = agent_class() # Registriere den Agenten self.register_agent(agent) logger.info(f"Agent '{agent.name}' (Typ: {agent.type}) wurde geladen") else: logger.warning(f"Keine {class_name} Klasse in Modul {module_name} gefunden") except ImportError as e: logger.warning(f"Modul {module_name} konnte nicht importiert werden: {e}") except Exception as e: logger.error(f"Fehler beim Laden des Agenten aus Modul {module_name}: {e}") def register_agent(self, agent: BaseAgent): """Registriert einen Agenten in der Registry.""" agent_type = agent.type self.agents[agent_type] = agent # Zusätzlich nach ID registrieren self.agents[agent.id] = agent logger.debug(f"Agent '{agent.name}' (Typ: {agent_type}) wurde registriert") def get_agent(self, agent_identifier: str) -> Optional[BaseAgent]: """ Gibt eine Instanz eines Agenten nach ID oder Typ zurück. Args: agent_identifier: ID oder Typ des gewünschten Agenten Returns: Agent-Instanz oder None, wenn nicht gefunden """ # Versuche, direkt nach Typ zu finden if agent_identifier in self.agents: return self.agents[agent_identifier] # Wenn nicht gefunden, versuche verschiedene Varianten des Namens variants = [ agent_identifier, agent_identifier.replace('_agent', ''), f"{agent_identifier}_agent" ] for variant in variants: if variant in self.agents: return self.agents[variant] logger.warning(f"Agent mit Identifier '{agent_identifier}' nicht gefunden") return None def get_all_agents(self) -> Dict[str, BaseAgent]: """Gibt alle registrierten Agenten zurück.""" return self.agents def get_agent_infos(self) -> List[Dict[str, Any]]: """Gibt Informationen zu allen registrierten Agenten zurück.""" agent_infos = [] # Nur einmal pro Agent-Instanz (da wir sowohl nach Typ als auch nach ID registrieren) seen_agents = set() for agent in self.agents.values(): if agent not in seen_agents: agent_infos.append(agent.get_agent_info()) seen_agents.add(agent) return agent_infos def initialize_agents_for_workflow(self) -> Dict[str, Dict[str, Any]]: """Initialisiert Agenten für einen Workflow.""" initialized_agents = {} seen_agents = set() for agent in self.agents.values(): if agent not in seen_agents: agent_info = agent.get_agent_info() agent_id = agent_info["id"] initialized_agents[agent_id] = agent_info seen_agents.add(agent) return initialized_agents