gateway/modules/workflow/agentManager.py
2025-05-28 01:51:10 +02:00

271 lines
No EOL
9.7 KiB
Python

"""
Agent Manager Module for managing, initializing, and executing agents.
"""
import os
import logging
import importlib
import asyncio
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime, UTC
from modules.workflow.agentBase import AgentBase
from modules.interfaces.serviceChatModel import AgentResponse, Task, ChatMessage
import uuid
from modules.workflow.taskManager import getTaskManager
logger = logging.getLogger(__name__)
class AgentManager:
"""Central manager for all agents in the system, handling registration, initialization, and execution."""
_instance = None
@classmethod
def getInstance(cls):
"""Return a singleton instance of the agent manager."""
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self):
"""Initialize the agent manager."""
if AgentManager._instance is not None:
raise RuntimeError("Singleton instance already exists - use getInstance()")
self.agents: Dict[str, AgentBase] = {}
self.service = None
self.taskManager = getTaskManager()
self._loadAgents()
def initialize(self, service=None):
"""Initialize or update the manager with service references."""
if service:
# Validate required interfaces
required_interfaces = ['base', 'msft', 'google']
missing_interfaces = []
for interface in required_interfaces:
if not hasattr(service, interface):
missing_interfaces.append(interface)
if missing_interfaces:
logger.warning(f"Service container missing required interfaces: {', '.join(missing_interfaces)}")
return False
self.service = service
# Initialize agents with service
for agent in self.agents.values():
if service and hasattr(agent, 'setService'):
agent.setService(service)
return True
def _loadAgents(self):
"""Load all available agents from modules."""
logger.info("Loading agent modules...")
# List of agent modules to load
agentModules = []
agentDir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "agents")
# Search the directory for agent modules
for filename in os.listdir(agentDir):
if filename.startswith("agent") and filename.endswith(".py"):
agentModules.append(filename[0:-3]) # Remove .py extension
if not agentModules:
logger.warning("No agent modules found")
return
logger.info(f"{len(agentModules)} agent modules found")
# Load each agent module
for moduleName in agentModules:
try:
# Import the module
module = importlib.import_module(f"modules.agents.{moduleName}")
# Look for agent class or get_*_agent function
agentName = moduleName.split("agent")[-1]
className = f"Agent{agentName}"
getterName = f"getAgent{agentName}"
agent = None
# Try to get the agent via the get*Agent function
if hasattr(module, getterName):
getterFunc = getattr(module, getterName)
agent = getterFunc()
logger.info(f"Agent '{agent.name}' loaded via {getterName}()")
# Alternatively, try to instantiate the agent directly
elif hasattr(module, className):
agentClass = getattr(module, className)
agent = agentClass()
logger.info(f"Agent '{agent.name}' directly instantiated")
if agent:
# Register the agent
self.registerAgent(agent)
else:
logger.warning(f"No agent class or getter function found in module {moduleName}")
except ImportError as e:
logger.error(f"Module {moduleName} could not be imported: {e}")
except Exception as e:
logger.error(f"Error loading agent from module {moduleName}: {e}")
def registerAgent(self, agent: AgentBase):
"""
Register an agent in the manager.
Args:
agent: The agent to register
"""
agentId = getattr(agent, 'name', "unknown_agent")
self.agents[agentId] = agent
logger.debug(f"Agent '{agent.name}' registered")
def getAgent(self, agentIdentifier: str) -> Optional[AgentBase]:
"""
Return an agent instance.
Args:
agentIdentifier: ID or type of the desired agent
Returns:
Agent instance or None if not found
"""
if agentIdentifier in self.agents:
return self.agents[agentIdentifier]
logger.error(f"Agent with identifier '{agentIdentifier}' not found")
return None
def getAllAgents(self) -> Dict[str, AgentBase]:
"""
Get all registered agents.
Returns:
Dictionary mapping agent names to agent instances
"""
return self.agents.copy()
def getAgentInfos(self) -> List[Dict[str, Any]]:
"""Return information about all registered agents."""
agentInfos = []
seenAgents = set()
for agent in self.agents.values():
if agent not in seenAgents:
agentInfos.append(agent.getAgentInfo())
seenAgents.add(agent)
return agentInfos
async def executeAgent(self, task: Task) -> Tuple[AgentResponse, Task]:
"""
Execute an agent for a given task.
Args:
task: The task to execute
Returns:
Tuple of (AgentResponse, updated Task)
"""
agent = self.getAgent(task.agentName)
if not agent:
error_msg = f"Agent '{task.agentName}' not found"
logger.error(error_msg)
return (
AgentResponse(
success=False,
message=ChatMessage(
id=str(uuid.uuid4()),
workflowId=task.workflowId,
agentName=task.agentName,
message=error_msg,
role="system",
status="error",
sequenceNr=0,
startedAt=datetime.now(UTC).isoformat(),
finishedAt=datetime.now(UTC).isoformat(),
success=False
),
performance={},
progress=0.0
),
Task(**{**task.to_dict(), "status": "failed", "error": error_msg})
)
try:
# Update task status
task = self.taskManager.updateTaskStatus(task, "running")
task.startedAt = datetime.now(UTC).isoformat()
# Execute agent
startTime = datetime.now(UTC)
response = await agent.execute(task)
endTime = datetime.now(UTC)
# Calculate performance metrics
duration = (endTime - startTime).total_seconds()
performance = {
"duration": duration,
"startTime": startTime.isoformat(),
"endTime": endTime.isoformat()
}
# Update task with result
task.status = "completed" if response.success else "failed"
task.finishedAt = endTime.isoformat()
task.result = response.message
task.progress = response.progress
task.performance = performance
if not response.success:
task.error = response.message.message if response.message else "Unknown error"
# Create response
response = AgentResponse(
success=response.success,
message=response.message,
performance=performance
)
# Update task status
if response.success:
task = self.taskManager.completeTask(task, response.message)
else:
task = self.taskManager.handleTaskError(task, response.message.message if response.message else "Unknown error")
return response, task
except Exception as e:
error_msg = f"Error executing agent '{task.agentName}': {str(e)}"
logger.error(error_msg, exc_info=True)
# Create error response
error_response = AgentResponse(
success=False,
message=ChatMessage(
id=str(uuid.uuid4()),
workflowId=task.workflowId,
agentName=task.agentName,
message=error_msg,
role="system",
status="error",
sequenceNr=0,
startedAt=datetime.now(UTC).isoformat(),
finishedAt=datetime.now(UTC).isoformat(),
success=False
),
performance={},
progress=0.0
)
# Update task with error
task = self.taskManager.handleTaskError(task, error_msg)
return error_response, task
# Singleton factory for the agent manager
def getAgentManager():
return AgentManager.getInstance()