gateway/modules/agentservice_protocol.py
2025-04-16 21:42:26 +02:00

338 lines
No EOL
11 KiB
Python

"""
Agent Communication Protocol module for the Agentservice.
Defines a standardized format for agents to exchange information.
"""
import json
import uuid
from typing import Dict, Any, List, Optional
from datetime import datetime
class AgentMessage:
"""
Standard message format for inter-agent communication.
Includes content, metadata, and document references.
"""
def __init__(
self,
content: str,
sender_id: str,
receiver_id: Optional[str] = None,
message_type: str = "text",
metadata: Optional[Dict[str, Any]] = None,
documents: Optional[List[Dict[str, Any]]] = None,
context_id: Optional[str] = None
):
"""
Initialize an agent message.
Args:
content: The main message content
sender_id: ID of the sending agent
receiver_id: Optional ID of the receiving agent
message_type: Type of message (text, task, result, etc.)
metadata: Optional metadata dictionary
documents: Optional list of document references
context_id: Optional conversation context ID
"""
self.id = f"msg_{uuid.uuid4()}"
self.timestamp = datetime.now().isoformat()
self.content = content
self.sender_id = sender_id
self.receiver_id = receiver_id
self.message_type = message_type
self.metadata = metadata or {}
self.documents = documents or []
self.context_id = context_id
def to_dict(self) -> Dict[str, Any]:
"""Convert the message to a dictionary."""
return {
"id": self.id,
"timestamp": self.timestamp,
"content": self.content,
"sender_id": self.sender_id,
"receiver_id": self.receiver_id,
"message_type": self.message_type,
"metadata": self.metadata,
"documents": self.documents,
"context_id": self.context_id
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'AgentMessage':
"""Create a message from a dictionary."""
message = cls(
content=data.get("content", ""),
sender_id=data.get("sender_id", "unknown"),
receiver_id=data.get("receiver_id"),
message_type=data.get("message_type", "text"),
metadata=data.get("metadata", {}),
documents=data.get("documents", []),
context_id=data.get("context_id")
)
message.id = data.get("id", message.id)
message.timestamp = data.get("timestamp", message.timestamp)
return message
def to_json(self) -> str:
"""Convert the message to a JSON string."""
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> 'AgentMessage':
"""Create a message from a JSON string."""
return cls.from_dict(json.loads(json_str))
class AgentCommunicationProtocol:
"""
Defines the protocol for agents to communicate with each other.
Provides standardized message creation and handling.
"""
@staticmethod
def create_text_message(
content: str,
sender_id: str,
receiver_id: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
documents: Optional[List[Dict[str, Any]]] = None,
context_id: Optional[str] = None
) -> AgentMessage:
"""Create a simple text message."""
return AgentMessage(
content=content,
sender_id=sender_id,
receiver_id=receiver_id,
message_type="text",
metadata=metadata,
documents=documents,
context_id=context_id
)
@staticmethod
def create_task_message(
task_description: str,
sender_id: str,
receiver_id: str,
input_data: Optional[Dict[str, Any]] = None,
documents: Optional[List[Dict[str, Any]]] = None,
context_id: Optional[str] = None
) -> AgentMessage:
"""Create a task assignment message."""
metadata = {
"task_type": "general",
"input_data": input_data or {},
"priority": "normal",
"task_id": f"task_{uuid.uuid4()}"
}
return AgentMessage(
content=task_description,
sender_id=sender_id,
receiver_id=receiver_id,
message_type="task",
metadata=metadata,
documents=documents,
context_id=context_id
)
@staticmethod
def create_result_message(
result_content: str,
sender_id: str,
receiver_id: str,
task_id: str,
output_data: Optional[Dict[str, Any]] = None,
result_format: str = "text",
documents: Optional[List[Dict[str, Any]]] = None,
context_id: Optional[str] = None
) -> AgentMessage:
"""Create a task result message."""
metadata = {
"task_id": task_id,
"result_format": result_format,
"status": "completed",
"output_data": output_data or {}
}
return AgentMessage(
content=result_content,
sender_id=sender_id,
receiver_id=receiver_id,
message_type="result",
metadata=metadata,
documents=documents,
context_id=context_id
)
@staticmethod
def create_error_message(
error_description: str,
sender_id: str,
receiver_id: Optional[str] = None,
error_type: str = "general",
error_details: Optional[Dict[str, Any]] = None,
context_id: Optional[str] = None
) -> AgentMessage:
"""Create an error message."""
metadata = {
"error_type": error_type,
"error_details": error_details or {},
"severity": "error"
}
return AgentMessage(
content=error_description,
sender_id=sender_id,
receiver_id=receiver_id,
message_type="error",
metadata=metadata,
context_id=context_id
)
@staticmethod
def create_document_request_message(
document_description: str,
sender_id: str,
receiver_id: str,
filters: Optional[Dict[str, Any]] = None,
context_id: Optional[str] = None
) -> AgentMessage:
"""Create a document request message."""
metadata = {
"request_type": "document",
"filters": filters or {},
"request_id": f"req_{uuid.uuid4()}"
}
return AgentMessage(
content=document_description,
sender_id=sender_id,
receiver_id=receiver_id,
message_type="request",
metadata=metadata,
context_id=context_id
)
@staticmethod
def create_status_update_message(
status_description: str,
sender_id: str,
receiver_id: Optional[str] = None,
status: str = "in_progress",
progress: float = 0.0,
context_id: Optional[str] = None
) -> AgentMessage:
"""Create a status update message."""
metadata = {
"status": status,
"progress": progress,
"update_type": "status"
}
return AgentMessage(
content=status_description,
sender_id=sender_id,
receiver_id=receiver_id,
message_type="status",
metadata=metadata,
context_id=context_id
)
@staticmethod
def convert_system_message_to_agent_message(system_message: Dict[str, Any], sender_id: str) -> AgentMessage:
"""
Convert a system message to an agent message.
Args:
system_message: Message object from the workflow
sender_id: ID of the sending agent
Returns:
AgentMessage instance
"""
# Extract basic information
content = system_message.get("content", "")
message_id = system_message.get("id", f"msg_{uuid.uuid4()}")
timestamp = system_message.get("started_at", datetime.now().isoformat())
# Create metadata
metadata = {
"agent_type": system_message.get("agent_type"),
"agent_name": system_message.get("agent_name"),
"workflow_id": system_message.get("workflow_id"),
"sequence_no": system_message.get("sequence_no"),
"result_format": system_message.get("result_format"),
"original_message_id": message_id
}
# Create agent message
agent_message = AgentMessage(
content=content,
sender_id=sender_id,
message_type="system",
metadata=metadata,
documents=system_message.get("documents", []),
context_id=system_message.get("workflow_id")
)
# Set original ID and timestamp
agent_message.id = message_id
agent_message.timestamp = timestamp
return agent_message
@staticmethod
def convert_agent_message_to_system_message(agent_message: AgentMessage) -> Dict[str, Any]:
"""
Convert an agent message to a system message.
Args:
agent_message: The agent message to convert
Returns:
System message dictionary
"""
message_data = agent_message.to_dict()
metadata = message_data.get("metadata", {})
# Create system message structure
system_message = {
"id": message_data.get("id", f"msg_{uuid.uuid4()}"),
"workflow_id": message_data.get("context_id"),
"started_at": message_data.get("timestamp", datetime.now().isoformat()),
"finished_at": datetime.now().isoformat(),
"sequence_no": metadata.get("sequence_no", 0),
"status": "completed",
"role": "assistant",
"data_stats": {
"processing_time": 0.0,
"token_count": 0,
"bytes_sent": 0,
"bytes_received": 0
},
"agent_type": metadata.get("agent_type"),
"agent_id": message_data.get("sender_id"),
"agent_name": metadata.get("agent_name"),
"result_format": metadata.get("result_format", "text"),
"content": message_data.get("content", ""),
"documents": message_data.get("documents", [])
}
# If this is a result message, add more metadata
if message_data.get("message_type") == "result":
system_message["output_data"] = metadata.get("output_data", {})
system_message["task_id"] = metadata.get("task_id")
return system_message
# Factory function
def get_agent_protocol():
"""Get the agent communication protocol."""
return AgentCommunicationProtocol