gateway/gwserver/modules/agentservice_base.py
2025-04-14 20:05:33 +02:00

260 lines
8.9 KiB
Python

"""
Enhanced base agent class for the Agentservice.
Provides improved communication and document handling capabilities.
"""
import logging
import json
from typing import Dict, Any, List, Optional, Tuple, Union
import asyncio
from datetime import datetime
import uuid
logger = logging.getLogger(__name__)
class BaseAgent:
"""
Enhanced base agent class with improved communication capabilities.
All specialized agents should inherit from this class.
"""
def __init__(self):
"""Initialize the enhanced agent."""
self.id = "base_agent"
self.name = "Base Agent"
self.type = "base"
self.description = "Base agent for the Agentservice"
self.capabilities = "Basic agent operations"
self.result_format = "Text"
# New properties for document handling
self.supports_documents = True
self.document_capabilities = ["read", "reference"]
self.required_context = []
# System dependencies
self.ai_service = None
self.document_handler = None
self.lucydom_interface = None
def set_dependencies(self, ai_service=None, document_handler=None, lucydom_interface=None):
"""
Set system dependencies.
Args:
ai_service: AI service for text generation
document_handler: Document handler for document operations
lucydom_interface: LucyDOM interface for database access
"""
self.ai_service = ai_service
self.document_handler = document_handler
self.lucydom_interface = lucydom_interface
def get_agent_info(self) -> Dict[str, Any]:
"""
Get detailed information about the agent.
Returns:
Dictionary with agent information
"""
return {
"id": self.id,
"name": self.name,
"type": self.type,
"description": self.description,
"capabilities": self.capabilities,
"result_format": self.result_format,
"supports_documents": self.supports_documents,
"document_capabilities": self.document_capabilities,
"required_context": self.required_context
}
def get_capabilities(self) -> List[str]:
"""
Get a list of agent capabilities.
Returns:
List of capability strings
"""
# Split capabilities into a list
if isinstance(self.capabilities, str):
return [cap.strip() for cap in self.capabilities.split(",")]
return []
def get_supported_formats(self) -> List[str]:
"""
Get supported output formats.
Returns:
List of supported format strings
"""
if isinstance(self.result_format, str):
return [fmt.strip() for fmt in self.result_format.split(",")]
return ["Text"]
async def process_message(self, message: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Process a message and generate a response.
Args:
message: Input message
context: Optional context information
Returns:
Response message
"""
# Basic implementation - should be overridden by specialized agents
if not self.ai_service:
logger.warning(f"Agent {self.id} has no AI service configured")
return {
"role": "assistant",
"content": f"I'm {self.name}, but I'm not properly configured. Please set up the AI service.",
"agent_id": self.id,
"agent_type": self.type,
"result_format": "Text"
}
# Process documents if available and set up document handler
document_context = ""
if self.supports_documents and self.document_handler and message.get("documents"):
document_context = await self._process_documents(message)
# Create enhanced prompt
prompt = self._create_enhanced_prompt(message, document_context, context)
# Generate response
try:
response_content = await self.ai_service.call_api([
{"role": "system", "content": self._get_system_prompt()},
{"role": "user", "content": prompt}
])
# Process the response to extract any special instructions or status
content, status = self._process_response(response_content)
return {
"role": "assistant",
"content": content,
"agent_id": self.id,
"agent_type": self.type,
"agent_name": self.name,
"result_format": self.result_format,
"status": status,
"workflow_id": message.get("workflow_id"),
"documents": message.get("documents", []) # Pass through documents
}
except Exception as e:
logger.error(f"Error in agent {self.id}: {str(e)}")
return {
"role": "assistant",
"content": f"I encountered an error: {str(e)}",
"agent_id": self.id,
"agent_type": self.type,
"result_format": "Text",
"status": "error"
}
async def _process_documents(self, message: Dict[str, Any]) -> str:
"""
Process documents in the message.
Args:
message: Input message with documents
Returns:
Document context as text
"""
# Simply extract text from documents
if not self.document_handler:
return ""
return self.document_handler.merge_document_contents(message)
def _create_enhanced_prompt(self, message: Dict[str, Any], document_context: str, context: Dict[str, Any] = None) -> str:
"""
Create an enhanced prompt with context.
Args:
message: Input message
document_context: Document context
context: Optional additional context
Returns:
Enhanced prompt
"""
prompt = message.get("content", "")
# Add document context if available
if document_context:
prompt += f"\n\n=== DOCUMENT CONTEXT ===\n{document_context}"
# Add any additional context
if context:
# Add expected format if specified
if "expected_format" in context:
prompt += f"\n\nPlease format your response as: {context['expected_format']}"
# Add dependency outputs if available
if "dependency_outputs" in context:
prompt += "\n\n=== OUTPUTS FROM PREVIOUS ACTIVITIES ===\n"
for key, value in context["dependency_outputs"].items():
if isinstance(value, dict) and "content" in value:
prompt += f"\n--- {key} ---\n{value['content']}\n"
else:
prompt += f"\n--- {key} ---\n{str(value)}\n"
return prompt
def _get_system_prompt(self) -> str:
"""
Get the system prompt for the agent.
Returns:
System prompt string
"""
return f"""
You are {self.name}, a specialized {self.type} agent.
{self.description}
Your capabilities include: {self.capabilities}
You should format your responses according to: {self.result_format}
Respond clearly and helpfully to the user's request.
When appropriate, include a status indicator at the end of your message:
[STATUS: COMPLETE] - When you've fully addressed the request
[STATUS: PARTIAL] - When you've partially addressed the request
[STATUS: QUESTION] - When you need more information
"""
def _process_response(self, response: str) -> Tuple[str, str]:
"""
Process the response to extract status and clean content.
Args:
response: Raw response from the AI
Returns:
Tuple of (cleaned content, status)
"""
# Default status
status = "complete"
# Check for status tags
import re
status_match = re.search(r'\[STATUS:\s*(COMPLETE|PARTIAL|QUESTION)\]', response, re.IGNORECASE)
if status_match:
status_value = status_match.group(1).lower()
# Remove the status tag
content = re.sub(r'\[STATUS:\s*(COMPLETE|PARTIAL|QUESTION)\]', '', response, flags=re.IGNORECASE).strip()
return content, status_value
return response, status
# Factory functions
def get_enhanced_base_agent() -> BaseAgent:
"""Get an instance of the enhanced base agent."""
return BaseAgent()