gateway/gwserver/modules/agentservice_part_agents.py
2025-03-25 11:59:04 +01:00

139 lines
No EOL
4.7 KiB
Python

import os
import json
import logging
from typing import Dict, Any, List, Optional
# Logger konfigurieren
logger = logging.getLogger(__name__)
def get_agent_instructions(agent_type: str, agent: Dict[str, Any] = None) -> str:
"""Minimalist agent instructions"""
if agent and agent.get("instructions"):
instructions = agent.get("instructions")
else:
instructions = """
Analysiere die Anfrage gründlich.
Liefere präzise und hilfreiche Antworten.
Strukturiere deine Antwort klar.
"""
file_access = """
Dateibefehl: [[FILE:load_file(file_id=ID, complete=true)]]
"""
return instructions + file_access
def get_default_agent_instructions() -> str:
"""
Gibt Standard-Anweisungen für einen Agenten zurück,
wenn keine spezifischen Anweisungen verfügbar sind.
Diese Funktion gibt generische Anweisungen zurück, unabhängig vom Agententyp.
"""
return """
Als Agent ist es deine Aufgabe, Anfragen zu analysieren und entsprechend deinen Fähigkeiten zu bearbeiten.
Folge diesen allgemeinen Anweisungen:
1. Verstehe die Anfrage gründlich
2. Analysiere relevante Daten und Informationen
3. Liefere präzise und hilfreiche Antworten
4. Strukturiere deine Antwort klar und verständlich
In deiner Antwort:
- Beginne mit einer Zusammenfassung der Anfrage
- Gib gut begründete Antworten oder Empfehlungen
- Führe wichtige Erkenntnisse klar auf
- Schließe mit konkreten nächsten Schritten oder Empfehlungen ab
"""
def initialize_agents(agents: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
"""
Initialisiert die Agenten mit ihren Fähigkeiten und Status
Args:
agents: Liste der Agenten aus dem Workflow
Returns:
Dictionary mit Agent-IDs als Schlüssel und Agent-Informationen
"""
available_agents = {}
for agent in agents:
agent_id = agent["id"]
agent_name = agent["name"]
agent_type = agent["type"]
agent_capabilities = agent.get("capabilities", "")
# Kopiere alle Felder vom Original-Agenten und füge used-Status hinzu
agent_data = agent.copy()
agent_data["used"] = False
available_agents[agent_id] = agent_data
# Log agent data for debugging
logger.debug(f"Initialized agent: {agent_name} (Type: {agent_type})")
if "instructions" in agent_data:
logger.debug(f"Agent {agent_name} has instructions of length: {len(agent_data['instructions'])}")
logger.info(f"Initialized {len(available_agents)} agents for workflow")
return available_agents
def get_moderator_prompt(available_agents: Dict[str, Dict[str, Any]]) -> str:
"""Streamlined moderator prompt"""
base = "Du bist Moderator eines Multi-Agent-Systems. Koordiniere die Agenten, um die Anfrage zu erfüllen."
agents_list = "\nAgenten:\n"
for agent_id, agent in available_agents.items():
status = "" if agent["used"] else ""
agents_list += f"- {agent['name']} ({agent['type']}): {status}\n"
instructions = """
Pro Runde: Wähle EINEN Agenten ODER beende den Workflow nur wenn die Anfrage vollständig beantwortet ist.
Bei Agentenwahl: "Ich wähle [Agentname]"
Bei Abschluss: "Workflow beenden"
"""
return base + agents_list + instructions
def create_agent_prompt(agent: Dict[str, Any], agent_instructions: str) -> Dict[str, str]:
"""Minimal agent prompt"""
content = f"""
Du bist Agent {agent['name']} ({agent['type']}).
{agent_instructions}
Format: [Agent: {agent['name']}] Deine Antwort...
""".strip()
return {"role": "system", "content": content}
def find_next_agent(moderator_text: str, available_agents: Dict[str, Dict[str, Any]]) -> Optional[str]:
"""Simplified agent selection logic"""
text = moderator_text.lower()
# Check for workflow completion
if "workflow beenden" in text:
return "WORKFLOW_COMPLETE"
# Look for "ich wähle" pattern
if "ich wähle" in text:
for agent_id, agent in available_agents.items():
if agent["name"].lower() in text:
return agent_id
# Simple name matching
for agent_id, agent in available_agents.items():
if agent["name"].lower() in text:
return agent_id
# Fallback: first unused agent
for agent_id, agent in available_agents.items():
if not agent["used"]:
return agent_id
# Last resort: first agent
return list(available_agents.keys())[0] if available_agents else None