gateway/modules/chat_registry.py
2025-04-21 17:44:28 +02:00

208 lines
No EOL
7.5 KiB
Python

"""
Chat Agent Registry Module.
Provides a central registry system for all available agents.
Optimized for the standardized task processing pattern.
"""
import os
import logging
import importlib
import uuid
from datetime import datetime
from typing import Dict, Any, List, Optional
logger = logging.getLogger(__name__)
class AgentBase:
"""
Base class for all chat agents.
Defines the standardized interface for task processing.
"""
def __init__(self):
"""Initialize the base agent."""
self.name = "base-agent"
self.description = "Basic agent functionality"
self.capabilities = []
self.ai_service = None
def set_dependencies(self, ai_service=None):
"""Set external dependencies for the agent."""
self.ai_service = ai_service
def get_agent_info(self) -> Dict[str, Any]:
"""
Return standardized information about the agent's capabilities.
Returns:
Dictionary with name, description, and capabilities
"""
return {
"name": self.name,
"description": self.description,
"capabilities": self.capabilities
}
async def process_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a standardized task structure and return results.
This method must be implemented by all concrete agent classes.
Args:
task: A dictionary containing:
- task_id: Unique ID for this task
- workflow_id: ID of the parent workflow (optional)
- prompt: The main instruction for the agent
- input_documents: List of document objects to process
- output_specifications: List of required output documents
- context: Additional contextual information
Returns:
A dictionary containing:
- feedback: Text response explaining what the agent did
- documents: List of document objects created by the agent
"""
# Base implementation - should be overridden by specialized agents
logger.warning(f"Agent {self.name} is using the default implementation of process_task")
return {
"feedback": f"The process_task method was not implemented by agent '{self.name}'.",
"documents": []
}
class AgentRegistry:
"""Central registry for all available agents in the system."""
_instance = None
@classmethod
def get_instance(cls):
"""Return a singleton instance of the agent registry."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
"""Initialize the agent registry."""
if AgentRegistry._instance is not None:
raise RuntimeError("Singleton instance already exists - use get_instance()")
self.agents = {}
self.ai_service = None
self._load_agents()
def _load_agents(self):
"""Load all available agents from modules."""
logger.info("Loading agent modules...")
# List of agent modules to load
agent_modules = []
agent_dir = os.path.dirname(__file__)
# Search the directory for agent modules
for filename in os.listdir(agent_dir):
if filename.startswith("chat_agent_") and filename.endswith(".py"):
agent_modules.append(filename[:-3]) # Remove .py extension
if not agent_modules:
logger.warning("No agent modules found")
return
logger.info(f"{len(agent_modules)} agent modules found")
# Load each agent module
for module_name in agent_modules:
try:
# Import the module
module = importlib.import_module(f"modules.{module_name}")
# Look for agent class or get_*_agent function
agent_name = module_name.split('_')[-1]
class_name = f"Agent{agent_name.capitalize()}"
getter_name = f"get_{agent_name}_agent"
agent = None
# Try to get the agent via the get_*_agent function
if hasattr(module, getter_name):
getter_func = getattr(module, getter_name)
agent = getter_func()
logger.info(f"Agent '{agent.name}' loaded via {getter_name}()")
# Alternatively, try to instantiate the agent directly
elif hasattr(module, class_name):
agent_class = getattr(module, class_name)
agent = agent_class()
logger.info(f"Agent '{agent.name}' directly instantiated")
if agent:
# Register the agent
self.register_agent(agent)
else:
logger.warning(f"No agent class or getter function found in module {module_name}")
except ImportError as e:
logger.error(f"Module {module_name} could not be imported: {e}")
except Exception as e:
logger.error(f"Error loading agent from module {module_name}: {e}")
def set_ai_service(self, ai_service):
"""Set the AI service for all agents."""
self.ai_service = ai_service
self.update_agent_dependencies()
def update_agent_dependencies(self):
"""Update dependencies for all registered agents."""
for agent_id, agent in self.agents.items():
if hasattr(agent, 'set_dependencies'):
agent.set_dependencies(ai_service=self.ai_service)
def register_agent(self, agent):
"""
Register an agent in the registry.
Args:
agent: The agent to register
"""
agent_id = getattr(agent, 'name', "unknown_agent")
# Initialize agent with dependencies
if hasattr(agent, 'set_dependencies'):
agent.set_dependencies(ai_service=self.ai_service)
self.agents[agent_id] = agent
logger.debug(f"Agent '{agent.name}' registered")
def get_agent(self, agent_identifier: str):
"""
Return an agent instance
Args:
agent_identifier: ID or type of the desired agent
Returns:
Agent instance or None if not found
"""
if agent_identifier in self.agents:
agent = self.agents[agent_identifier]
# Ensure the agent has the AI service
if hasattr(agent, 'set_dependencies') and self.ai_service:
agent.set_dependencies(ai_service=self.ai_service)
return agent
logger.error(f"Agent with identifier '{agent_identifier}' not found")
return None
def get_all_agents(self) -> Dict[str, Any]:
"""Return all registered agents."""
return self.agents
def get_agent_infos(self) -> List[Dict[str, Any]]:
"""Return information about all registered agents."""
agent_infos = []
seen_agents = set()
for agent in self.agents.values():
if agent not in seen_agents:
agent_infos.append(agent.get_agent_info())
seen_agents.add(agent)
return agent_infos
# Singleton factory for the agent registry
def get_agent_registry():
return AgentRegistry.get_instance()