""" Agent Registry Module. Provides a central registry system for all available agents. Optimized for the standardized task processing pattern. """ import os import logging import importlib import uuid from datetime import datetime from typing import Dict, Any, List, Optional from modules.mimeUtils import isTextMimeType, determineContentEncoding logger = logging.getLogger(__name__) """ Updates to the AgentBase class in workflowAgentsRegistry.py to include base64Encoded flag handling. """ class AgentBase: """ Base class for all chat agents. Defines the standardized interface for task processing. """ def __init__(self): """Initialize the base agent.""" self.name = "base-agent" self.label = "(Template)" self.description = "Basic agent functionality" self.capabilities = [] self.mydom = None self.workflowManager = None # Will be set by workflow manager def setDependencies(self, mydom=None): """Set external dependencies for the agent.""" self.mydom = mydom def getAgentInfo(self) -> Dict[str, Any]: """ Return standardized information about the agent's capabilities. Returns: Dictionary with name, description, and capabilities """ return { "name": self.name, "description": self.description, "capabilities": self.capabilities } async def processTask(self, task: Dict[str, Any]) -> Dict[str, Any]: """ Process a standardized task structure and return results. This method must be implemented by all concrete agent classes. Args: task: A dictionary containing: - taskId: Unique ID for this task - workflowId: ID of the parent workflow - prompt: The main instruction for the agent - inputDocuments: List of document objects to process - outputSpecifications: List of required output documents - context: Additional contextual information including: - workflow: The complete workflow object - workflowRound: Current workflow round - agentType: Type of agent - timestamp: Task timestamp - language: User language Returns: A dictionary containing: - feedback: Text response explaining what the agent did - documents: List of document objects created by the agent, each containing a "base64Encoded" flag in addition to "label" and "content" """ # Base implementation - should be overridden by specialized agents logger.warning(f"Agent {self.name} is using the default implementation of processTask") return { "feedback": f"The processTask method was not implemented by agent '{self.name}'.", "documents": [] } def determineBase64EncodingFlag(self, filename: str, content: Any, mimeType: str = None) -> bool: """Wrapper for the utility function""" return determineContentEncoding(filename, content, mimeType) def isTextMimeType(self, mimeType: str) -> bool: """Wrapper for the utility function""" return isTextMimeType(mimeType) def formatAgentDocumentOutput(self, label: str, content: Any, mimeType: str = None) -> Dict[str, Any]: """ Format agent output as a document. Args: label: Label for the document content: Content of the document mimeType: Optional MIME type for the document """ # Create document structure doc = { "id": str(uuid.uuid4()), "name": label, "ext": "txt", # Default extension "data": content, "base64Encoded": False, "metadata": { "isText": True } } # Set MIME type if provided if mimeType: doc["mimeType"] = mimeType # Update extension based on MIME type if mimeType == "text/markdown": doc["ext"] = "md" elif mimeType == "text/html": doc["ext"] = "html" elif mimeType == "text/csv": doc["ext"] = "csv" elif mimeType == "application/json": doc["ext"] = "json" elif mimeType.startswith("image/"): doc["ext"] = mimeType.split("/")[1] doc["metadata"]["isText"] = False elif mimeType == "application/pdf": doc["ext"] = "pdf" doc["metadata"]["isText"] = False return doc class AgentRegistry: """Central registry for all available agents in the system.""" _instance = None @classmethod def getInstance(cls): """Return a singleton instance of the agent registry.""" if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self): """Initialize the agent registry.""" if AgentRegistry._instance is not None: raise RuntimeError("Singleton instance already exists - use getInstance()") self.agents = {} self.mydom = None self._loadAgents() def _loadAgents(self): """Load all available agents from modules.""" logger.info("Loading agent modules...") # List of agent modules to load agentModules = [] agentDir = os.path.dirname(__file__) # Search the directory for agent modules for filename in os.listdir(agentDir): if filename.startswith("agent") and filename.endswith(".py"): agentModules.append(filename[0:-3]) # Remove .py extension if not agentModules: logger.warning("No agent modules found") return logger.info(f"{len(agentModules)} agent modules found") # Load each agent module for moduleName in agentModules: try: # Import the module module = importlib.import_module(f"modules.{moduleName}") # Look for agent class or get_*_agent function agentName = moduleName.split("agent")[-1] className = f"Agent{agentName}" getterName = f"getAgent{agentName}" agent = None # Try to get the agent via the get*Agent function if hasattr(module, getterName): getterFunc = getattr(module, getterName) agent = getterFunc() logger.info(f"Agent '{agent.name}' loaded via {getterName}()") # Alternatively, try to instantiate the agent directly elif hasattr(module, className): agentClass = getattr(module, className) agent = agentClass() logger.info(f"Agent '{agent.name}' directly instantiated") if agent: # Register the agent self.registerAgent(agent) else: logger.warning(f"No agent class or getter function found in module {moduleName}") except ImportError as e: logger.error(f"Module {moduleName} could not be imported: {e}") except Exception as e: logger.error(f"Error loading agent from module {moduleName}: {e}") def setMydom(self, mydom): """Set the AI service for all agents.""" self.mydom = mydom self.updateAgentDependencies() def setWorkflowManager(self, workflowManager): """Set the workflow manager reference for all agents.""" for agent in self.agents.values(): agent.workflowManager = workflowManager def updateAgentDependencies(self): """Update dependencies for all registered agents.""" for agentId, agent in self.agents.items(): if hasattr(agent, 'setDependencies'): agent.setDependencies(mydom=self.mydom) def registerAgent(self, agent): """ Register an agent in the registry. Args: agent: The agent to register """ agentId = getattr(agent, 'name', "unknown_agent") # Initialize agent with dependencies if hasattr(agent, 'setDependencies'): agent.setDependencies(mydom=self.mydom) self.agents[agentId] = agent logger.debug(f"Agent '{agent.name}' registered") def getAgent(self, agentIdentifier: str): """ Return an agent instance Args: agentIdentifier: ID or type of the desired agent Returns: Agent instance or None if not found """ if agentIdentifier in self.agents: agent = self.agents[agentIdentifier] # Ensure the agent has the AI service if self.mydom: agent.mydom = self.mydom return agent logger.error(f"Agent with identifier '{agentIdentifier}' not found") return None def getAllAgents(self) -> Dict[str, Any]: """Return all registered agents.""" return self.agents def getAgentInfos(self) -> List[Dict[str, Any]]: """Return information about all registered agents.""" agentInfos = [] seenAgents = set() for agent in self.agents.values(): if agent not in seenAgents: agentInfos.append(agent.getAgentInfo()) seenAgents.add(agent) return agentInfos # Singleton factory for the agent registry def getAgentRegistry(): return AgentRegistry.getInstance()