1259 lines
53 KiB
Python
1259 lines
53 KiB
Python
import asyncio
|
|
import uuid
|
|
import os
|
|
import logging
|
|
import json
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
from datetime import datetime
|
|
from fastapi import HTTPException
|
|
import configload as configload
|
|
|
|
# Import der Teilmodule
|
|
import modules.agentservice_part_filehandling as file_handling
|
|
import modules.agentservice_part_agents as agents
|
|
import modules.agentservice_part_results as results
|
|
|
|
# Logger konfigurieren
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Konfigurationsdaten laden
|
|
def load_config_data():
|
|
config = configload.load_config()
|
|
result = {
|
|
"application": {
|
|
"debug": config.get('Module_AgentserviceInterface', 'DEBUG'),
|
|
"upload_dir": config.get('Module_AgentserviceInterface', 'UPLOAD_DIR'),
|
|
"results_dir": config.get('Module_AgentserviceInterface', 'RESULTS_DIR'),
|
|
"max_history": config.get('Module_AgentserviceInterface', 'MAX_HISTORY'),
|
|
"ai_provider": config.get('Module_AgentserviceInterface', 'AI_PROVIDER'),
|
|
}
|
|
}
|
|
# Debug-Modus einstellen
|
|
if result["application"]["debug"].lower() == "true":
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logger.setLevel(logging.DEBUG)
|
|
logger.debug("Debug-Modus aktiviert")
|
|
return result
|
|
|
|
|
|
class AgentService:
|
|
"""
|
|
Service für die Verwaltung und Ausführung von Multi-Agent-Workflows mit verschiedenen Modellen.
|
|
"""
|
|
|
|
def __init__(self, mandate_id: int = None, user_id: int = None):
|
|
"""
|
|
Initialisiert den AgentService.
|
|
|
|
Args:
|
|
mandate_id: ID des aktuellen Mandanten (optional)
|
|
user_id: ID des aktuellen Benutzers (optional)
|
|
"""
|
|
# Mandanten- und Benutzerkontext
|
|
self.mandate_id = mandate_id
|
|
self.user_id = user_id
|
|
|
|
# Konfiguration laden
|
|
self.config = load_config_data()
|
|
|
|
# Verzeichnisse aus der Konfiguration übernehmen
|
|
self.results_dir = self.config["application"]["results_dir"]
|
|
self.upload_dir = self.config["application"]["upload_dir"]
|
|
self.max_history = int(self.config["application"]["max_history"])
|
|
|
|
# AI Provider aus der Konfiguration übernehmen
|
|
self.ai_provider = self.config["application"]["ai_provider"].lower()
|
|
|
|
# Verzeichnisse erstellen
|
|
os.makedirs(self.results_dir, exist_ok=True)
|
|
os.makedirs(self.upload_dir, exist_ok=True)
|
|
|
|
# Connector-Instanzen initialisieren
|
|
if self.ai_provider == "anthropic":
|
|
import connector_aichat_anthropic as service_aichat
|
|
self.service_aichat = service_aichat.ChatService()
|
|
logger.info("Anthropic AI Provider wird verwendet")
|
|
else:
|
|
import connector_aichat_openai as service_aichat
|
|
self.service_aichat = service_aichat.ChatService()
|
|
logger.info("OpenAI AI Provider wird verwendet")
|
|
|
|
import connector_aiweb_webscraping as service_aiscrap
|
|
self.service_aiscrap = service_aiscrap.WebScrapingService()
|
|
|
|
logger.info(f"AgentService initialisiert mit:")
|
|
logger.info(f" - AI Provider: {self.ai_provider}")
|
|
logger.info(f" - Ergebnisverzeichnis: {self.results_dir}")
|
|
logger.info(f" - Upload-Verzeichnis: {self.upload_dir}")
|
|
|
|
# Workflow-Speicher
|
|
self.workflows = {}
|
|
|
|
# Statistiken für die Datenmengen
|
|
self.data_stats = {}
|
|
|
|
async def execute_workflow(
|
|
self,
|
|
workflow_id: str,
|
|
prompt: str,
|
|
agents_list: List[Dict[str, Any]],
|
|
files: List[Dict[str, Any]]
|
|
) -> str:
|
|
"""
|
|
Führt einen Workflow mit den angegebenen Agenten und Dateien aus.
|
|
|
|
Anstatt die Agenten der Reihe nach abzuarbeiten, wird ein AI-Moderator verwendet,
|
|
der die Agenten basierend auf ihren Fähigkeiten und bisherigen Antworten steuert.
|
|
|
|
Verwendet file_handling.prepare_message_for_ai für die Vorbereitung der Nachrichten,
|
|
wobei alle Dateiinhalte vollständig gelesen werden.
|
|
"""
|
|
logger.info(f"Starte Workflow {workflow_id} mit {len(agents_list)} Agenten und {len(files)} Dateien")
|
|
|
|
# Mandanten- und Benutzerkontext in die Workflow-Daten aufnehmen
|
|
self.workflows[workflow_id] = {
|
|
"id": workflow_id,
|
|
"mandate_id": self.mandate_id,
|
|
"user_id": self.user_id,
|
|
"status": "running",
|
|
"progress": 0.0,
|
|
"started_at": datetime.now().isoformat(),
|
|
"completed_at": None,
|
|
"agent_statuses": {},
|
|
"logs": [],
|
|
"results": [],
|
|
# Statistik für diesen Workflow initialisieren
|
|
"data_stats": {
|
|
"sent_bytes": 0,
|
|
"received_bytes": 0
|
|
}
|
|
}
|
|
|
|
# Log-Eintrag für den Start des Workflows
|
|
self._add_log(workflow_id, "Workflow gestartet", "info")
|
|
self._add_log(workflow_id, f"Verarbeite {len(files)} Dateien...", "info")
|
|
|
|
# Dateikontexte vorbereiten
|
|
file_contexts = file_handling.prepare_file_contexts(files, self.upload_dir)
|
|
for fc in file_contexts:
|
|
logger.debug(f"Dateikontext: ID={fc['id']}, Name={fc['name']}, Typ={fc.get('type', 'unbekannt')}")
|
|
|
|
# Dateiinhalte lesen - EINMAL für den gesamten Workflow
|
|
file_contents = await file_handling.read_file_contents(
|
|
file_contexts,
|
|
self.upload_dir,
|
|
workflow_id,
|
|
self._add_log
|
|
)
|
|
|
|
# Erstelle einen formatierten Kontext für die Protokollierung
|
|
file_context_text = file_handling.format_file_context_text(file_contexts, file_contents)
|
|
logger.debug(f"Dateikontexte erstellt: {len(file_contexts)} Dateien")
|
|
|
|
self.workflows[workflow_id]["progress"] = 0.1
|
|
|
|
# Initialisiere den Chatverlauf für den Agenten-Dialog
|
|
chat_history = []
|
|
|
|
# Erstelle das standardisierte Nachrichtenobjekt für die initialen Dateien und den Prompt
|
|
# Verwende file_handling.prepare_message_for_ai mit dem Service
|
|
initial_message = await file_handling.prepare_message_for_ai(
|
|
file_contexts,
|
|
prompt,
|
|
file_contents,
|
|
self.service_aichat
|
|
)
|
|
|
|
# Initialen Prompt zum Chatverlauf hinzufügen
|
|
chat_history.append(initial_message)
|
|
|
|
# Datenstatistik aktualisieren - Schätzung der gesendeten Bytes
|
|
message_size = len(json.dumps(initial_message))
|
|
self.workflows[workflow_id]["data_stats"]["sent_bytes"] += message_size
|
|
|
|
# Initialisiere die verfügbaren Agenten mit ihren Fähigkeiten
|
|
available_agents = agents.initialize_agents(agents_list)
|
|
|
|
# Füge den User-Agent hinzu
|
|
user_info = await self._get_user_info(self.user_id)
|
|
user_name = user_info.get("full_name") or user_info.get("username") or f"Benutzer {self.user_id}"
|
|
|
|
available_agents["user_agent"] = {
|
|
"id": "user_agent",
|
|
"name": "User Agent",
|
|
"full_name": user_name,
|
|
"type": "user",
|
|
"capabilities": "Beantwortung von Fragen, Bereitstellung zusätzlicher Informationen, Entscheidungsfindung",
|
|
"description": f"Repräsentiert den Benutzer {user_name}",
|
|
"used": False
|
|
}
|
|
|
|
# Initialisiere den Status für jeden Agenten
|
|
for agent_id in available_agents:
|
|
self.workflows[workflow_id]["agent_statuses"][agent_id] = "pending"
|
|
|
|
# Speichere wichtige Daten im Workflow-Objekt für die spätere Fortsetzung
|
|
self.workflows[workflow_id]["chat_history"] = chat_history
|
|
self.workflows[workflow_id]["available_agents"] = available_agents
|
|
self.workflows[workflow_id]["file_contexts"] = file_contexts
|
|
self.workflows[workflow_id]["file_contents"] = file_contents
|
|
|
|
# Starte den Workflow mit dem Moderator
|
|
self._add_log(workflow_id, f"Starte Agenten-Tischrunde mit Moderator und {len(available_agents)} Agenten", "info")
|
|
|
|
# Maximale Anzahl der Runden zur Vermeidung endloser Schleifen
|
|
max_rounds = 12
|
|
current_round = 0
|
|
workflow_complete = False
|
|
waiting_for_user_input = False
|
|
|
|
# Hauptschleife für den Workflow
|
|
while current_round < max_rounds and not workflow_complete:
|
|
current_round += 1
|
|
self.workflows[workflow_id]["current_round"] = current_round
|
|
self._add_log(workflow_id, f"Starte Runde {current_round}", "info")
|
|
|
|
# Falls auf eine Benutzereingabe gewartet wird, nicht fortfahren
|
|
if waiting_for_user_input:
|
|
self._add_log(workflow_id, "Warte auf Benutzereingabe...", "info")
|
|
# Warten wir einfach kurz und beenden dann die Schleife, um im nächsten Polling weiterzumachen
|
|
await asyncio.sleep(2)
|
|
break
|
|
|
|
# Führe einen Moderator-Zyklus durch
|
|
workflow_complete, waiting_for_user_input, selected_agent_id = await self._run_moderator_cycle(
|
|
workflow_id,
|
|
chat_history,
|
|
available_agents,
|
|
file_contexts,
|
|
file_contents,
|
|
is_user_input_continuation=False # Initialer Aufruf, keine Fortsetzung nach Benutzereingabe
|
|
)
|
|
|
|
# Fortschritt aktualisieren
|
|
self.workflows[workflow_id]["progress"] = min(0.9, 0.1 + (current_round / max_rounds) * 0.8)
|
|
|
|
# Workflow abschließen, wenn nicht auf Benutzereingabe wartend
|
|
if not waiting_for_user_input:
|
|
if workflow_complete:
|
|
self.workflows[workflow_id]["status"] = "completed"
|
|
self._add_log(workflow_id, "Workflow erfolgreich beendet", "success")
|
|
elif current_round >= max_rounds:
|
|
self.workflows[workflow_id]["status"] = "completed"
|
|
self._add_log(workflow_id, f"Workflow nach {max_rounds} Runden automatisch beendet", "info")
|
|
else:
|
|
self.workflows[workflow_id]["status"] = "failed"
|
|
self._add_log(workflow_id, "Workflow mit Fehlern beendet", "error")
|
|
|
|
self.workflows[workflow_id]["progress"] = 1.0
|
|
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
|
|
|
|
# Speichere Ergebnisse in Datei für spätere Verwendung
|
|
results.save_workflow_results(self.workflows, workflow_id, self.results_dir)
|
|
|
|
return workflow_id
|
|
|
|
|
|
async def _run_moderator_cycle(
|
|
self,
|
|
workflow_id: str,
|
|
chat_history: List[Dict[str, Any]],
|
|
available_agents: Dict[str, Dict[str, Any]],
|
|
file_contexts: List[Dict[str, Any]],
|
|
file_contents: Dict[str, str],
|
|
is_user_input_continuation: bool = False,
|
|
user_message: str = None
|
|
) -> Tuple[bool, bool, str]:
|
|
"""
|
|
Improved moderator cycle with structured agent selection and robust user agent handling
|
|
"""
|
|
# Initialize return values
|
|
workflow_complete = False
|
|
waiting_for_user_input = False
|
|
selected_agent_id = None
|
|
|
|
# Create a structured moderator prompt
|
|
moderator_system_prompt = self._create_structured_moderator_prompt(
|
|
available_agents,
|
|
is_user_input_continuation
|
|
)
|
|
|
|
# Moderator prompt as system message
|
|
moderator_prompt = {
|
|
"role": "system",
|
|
"content": moderator_system_prompt
|
|
}
|
|
|
|
# Create a copy of chat history for the moderator
|
|
moderator_chat = [moderator_prompt] + chat_history[-self.max_history:]
|
|
|
|
# Add summary of available agents with their status
|
|
agent_info = "Available agents:\n"
|
|
for agent_id, agent in available_agents.items():
|
|
status = "✓ Used" if agent["used"] else "✗ Not used yet"
|
|
agent_info += f"- {agent['name']} (Type: {agent['type']}): {agent.get('capabilities', '')}\n Status: {status}\n"
|
|
|
|
moderator_chat.append({
|
|
"role": "system",
|
|
"content": agent_info + "\nSelect the next agent or end the workflow if the task is complete."
|
|
})
|
|
|
|
# Let the moderator make the decision
|
|
try:
|
|
moderator_chat = await self._sanitize_messages(moderator_chat)
|
|
moderator_decision = await self.service_aichat.call_api(moderator_chat)
|
|
moderator_text = moderator_decision["choices"][0]["message"]["content"]
|
|
logger.debug(f"Full moderator decision text: {moderator_text}")
|
|
|
|
# Update data statistics
|
|
request_size = len(json.dumps(moderator_chat))
|
|
response_size = len(json.dumps(moderator_decision))
|
|
self.workflows[workflow_id]["data_stats"]["sent_bytes"] += request_size
|
|
self.workflows[workflow_id]["data_stats"]["received_bytes"] += response_size
|
|
|
|
# Try to parse structured agent selection from the moderator
|
|
selected_agent_id = self._parse_structured_agent_selection(
|
|
moderator_text,
|
|
available_agents
|
|
)
|
|
|
|
# Add the moderator's decision to chat history
|
|
logger.debug(f"pmd1 moderator_text (type: {type(moderator_text)}): {moderator_text}")
|
|
logger.debug(f"pmd1 chat_history (type: {type(chat_history)}): {chat_history}")
|
|
|
|
if not isinstance(moderator_text, str):
|
|
moderator_text = str(moderator_text)
|
|
|
|
chat_history.append({
|
|
"role": "assistant",
|
|
"content": f"[Moderator] {moderator_text}"
|
|
})
|
|
|
|
# Log the moderator's decision
|
|
self._add_log(workflow_id, f"Moderator decision: {moderator_text}", "info")
|
|
|
|
# Check if the workflow should be completed
|
|
if selected_agent_id == "WORKFLOW_COMPLETE":
|
|
self._add_log(workflow_id, "Moderator has ended the workflow", "success")
|
|
workflow_complete = True
|
|
return workflow_complete, waiting_for_user_input, selected_agent_id
|
|
|
|
# Check if User Agent is selected - CRITICAL PATH FOR USER INPUT
|
|
if selected_agent_id == "user_agent":
|
|
self._add_log(
|
|
workflow_id,
|
|
"Waiting for user input for User Agent",
|
|
"info",
|
|
"user_agent",
|
|
"User Agent"
|
|
)
|
|
# Mark that we're waiting for user input - this blocks further execution
|
|
waiting_for_user_input = True
|
|
self.workflows[workflow_id]["waiting_for_user"] = True
|
|
|
|
# Add request to chat history
|
|
chat_history.append({
|
|
"role": "assistant",
|
|
"content": f"[Moderator to User Agent] {moderator_text}"
|
|
})
|
|
|
|
# Update chat history in workflow
|
|
self.workflows[workflow_id]["chat_history"] = chat_history
|
|
|
|
# Save workflow state
|
|
results.save_workflow_results(self.workflows, workflow_id, self.results_dir)
|
|
|
|
# Return immediately to prevent further processing
|
|
return workflow_complete, waiting_for_user_input, selected_agent_id
|
|
|
|
# If no agent was selected, end the workflow
|
|
if not selected_agent_id:
|
|
self._add_log(workflow_id, "No agent selected. Ending workflow.", "warning")
|
|
workflow_complete = True
|
|
return workflow_complete, waiting_for_user_input, selected_agent_id
|
|
|
|
# Mark the selected agent as used
|
|
selected_agent = available_agents[selected_agent_id]
|
|
selected_agent["used"] = True
|
|
|
|
# Update agent status
|
|
self.workflows[workflow_id]["agent_statuses"][selected_agent_id] = "running"
|
|
self._add_log(
|
|
workflow_id,
|
|
f"Agent '{selected_agent['name']}' starts processing...",
|
|
"start",
|
|
selected_agent_id,
|
|
selected_agent['name']
|
|
)
|
|
|
|
# Create agent instructions
|
|
agent_instructions = agents.get_agent_instructions(
|
|
selected_agent["type"],
|
|
selected_agent,
|
|
file_contexts
|
|
)
|
|
|
|
# Create agent prompt
|
|
agent_prompt = agents.create_agent_prompt(selected_agent, agent_instructions)
|
|
|
|
# Create agent chat history
|
|
agent_chat = [agent_prompt] + chat_history[-self.max_history:]
|
|
|
|
# Handle web scraper agent type specifically
|
|
if selected_agent["type"] == "scraper":
|
|
self._add_log(
|
|
workflow_id,
|
|
"Performing web scraping...",
|
|
"info",
|
|
selected_agent_id,
|
|
selected_agent["name"]
|
|
)
|
|
|
|
# Use user message or initial prompt for scraping
|
|
scrape_prompt = user_message if is_user_input_continuation and user_message else \
|
|
(chat_history[0]["content"] if chat_history else "")
|
|
|
|
logger.debug("Web Scrape Prompt: "+str(scrape_prompt))
|
|
# Ensure scrape_prompt is a string
|
|
if not isinstance(scrape_prompt, str):
|
|
scrape_prompt = str(scrape_prompt)
|
|
|
|
# Always perform web scraping for scraper agent type
|
|
try:
|
|
web_data = await self.service_aiscrap.scrape_web_data(scrape_prompt)
|
|
if web_data:
|
|
web_data = web_data.strip() if isinstance(web_data, str) else web_data
|
|
agent_chat.append({
|
|
"role": "system",
|
|
"content": f"# Scraped Web Data\n{web_data}".strip()
|
|
})
|
|
self._add_log(
|
|
workflow_id,
|
|
"Web scraping completed",
|
|
"info",
|
|
selected_agent_id,
|
|
selected_agent["name"]
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error during web scraping: {str(e)}")
|
|
self._add_log(
|
|
workflow_id,
|
|
f"Error during web scraping: {str(e)}",
|
|
"error",
|
|
selected_agent_id,
|
|
selected_agent["name"]
|
|
)
|
|
|
|
# Execute the agent
|
|
try:
|
|
agent_chat = await self._sanitize_messages(agent_chat)
|
|
agent_response = await self.service_aichat.call_api(agent_chat)
|
|
agent_text = agent_response["choices"][0]["message"]["content"]
|
|
|
|
# Update data statistics
|
|
request_size = len(json.dumps(agent_chat))
|
|
response_size = len(json.dumps(agent_response))
|
|
self.workflows[workflow_id]["data_stats"]["sent_bytes"] += request_size
|
|
self.workflows[workflow_id]["data_stats"]["received_bytes"] += response_size
|
|
|
|
# Add agent's response to chat history
|
|
chat_history.append({
|
|
"role": "assistant",
|
|
"content": agent_text
|
|
})
|
|
|
|
# Create agent result
|
|
prompt_for_result = user_message if is_user_input_continuation else \
|
|
(chat_history[0]["content"] if chat_history else "")
|
|
|
|
result = results.create_agent_result(
|
|
workflow_id,
|
|
selected_agent,
|
|
len(self.workflows[workflow_id]["results"]),
|
|
prompt_for_result,
|
|
file_contexts,
|
|
agent_text,
|
|
self.mandate_id,
|
|
self.user_id
|
|
)
|
|
self.workflows[workflow_id]["results"].append(result)
|
|
|
|
self._add_log(
|
|
workflow_id,
|
|
f"Agent '{selected_agent['name']}' has completed processing",
|
|
"complete",
|
|
selected_agent_id,
|
|
selected_agent["name"]
|
|
)
|
|
|
|
# Update agent status
|
|
self.workflows[workflow_id]["agent_statuses"][selected_agent_id] = "completed"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error executing agent '{selected_agent['name']}': {str(e)}")
|
|
self._add_log(
|
|
workflow_id,
|
|
f"Error during execution: {str(e)}",
|
|
"error",
|
|
selected_agent_id,
|
|
selected_agent["name"]
|
|
)
|
|
self.workflows[workflow_id]["agent_statuses"][selected_agent_id] = "failed"
|
|
|
|
# Add error message to chat history
|
|
chat_history.append({
|
|
"role": "assistant",
|
|
"content": f"[Error with agent '{selected_agent['name']}']: {str(e)}"
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in moderator phase: {str(e)}")
|
|
self._add_log(workflow_id, f"Error in moderator phase: {str(e)}", "error")
|
|
|
|
# Update chat history in workflow
|
|
self.workflows[workflow_id]["chat_history"] = chat_history
|
|
|
|
return workflow_complete, waiting_for_user_input, selected_agent_id
|
|
|
|
|
|
def _create_structured_moderator_prompt(self, available_agents, is_user_input_continuation):
|
|
"""
|
|
Creates an improved moderator prompt that requests a structured format for agent selection
|
|
and enforces proper user agent interaction
|
|
"""
|
|
# Check if User Agent has been used and confirmed
|
|
user_agent_used = False
|
|
user_agent_confirmed = False
|
|
|
|
if "user_agent" in available_agents:
|
|
user_agent_used = available_agents["user_agent"].get("used", False)
|
|
if user_agent_used:
|
|
for chat_entry in available_agents["user_agent"].get("chat_entries", []):
|
|
if any(confirmation in chat_entry.lower() for confirmation in
|
|
["ja", "yes", "bestätige", "confirm", "stimme zu", "agree", "akzeptiere", "accept"]):
|
|
user_agent_confirmed = True
|
|
break
|
|
|
|
base = """You are a moderator of a multi-agent system. Your task is to coordinate the agents
|
|
to fully address the request and deliver a concrete final result.
|
|
First step is always to ask an agento to coordinate the necessary steps.
|
|
|
|
IMPORTANT: The workflow should only end when ACTUAL RESULTS have been delivered"""
|
|
|
|
# Different conditions for ending the workflow
|
|
if not user_agent_confirmed:
|
|
base += """ AND the User Agent has explicitly confirmed with a 'YES' that they are satisfied with the result.
|
|
CRITICALLY IMPORTANT: Before ending the workflow, you MUST ask the User Agent whether they are satisfied with the results,
|
|
and they must EXPLICITLY respond with 'YES' or a clear confirmation!"""
|
|
|
|
# Add structured format requirement
|
|
base += """
|
|
|
|
RESPONSE FORMAT: You must respond with a clear agent selection in the following format:
|
|
{
|
|
"next_agent": "agent_id", // The ID of the selected agent (e.g., "user_agent", "analyzer", "writer")
|
|
"task": "description" // Clear description of the task for the agent
|
|
}
|
|
|
|
DO NOT use any other format for agent selection.
|
|
"""
|
|
|
|
# Add specific instruction for user agent interaction
|
|
base += """
|
|
IMPORTANT USER AGENT HANDLING:
|
|
- When selecting the User Agent, formulate a CLEAR QUESTION that requires their input.
|
|
- The User Agent MUST be able to provide a response before the workflow continues.
|
|
- Never reference a User Agent question without actually selecting the User Agent.
|
|
"""
|
|
|
|
# Add agent listing and instructions
|
|
agents_list = "\nAvailable agents:\n"
|
|
for agent_id, agent in available_agents.items():
|
|
status = "✓ Already used" if agent["used"] else "✗ Not used yet"
|
|
result_status = ""
|
|
if agent["used"] and agent.get("last_result_status"):
|
|
result_status = f" (Last response: {agent.get('last_result_status')})"
|
|
|
|
description = agent.get("description", "")
|
|
capabilities = agent.get("capabilities", "")
|
|
agents_list += f"- {agent['name']} (ID: {agent_id}, Type: {agent['type']}): {capabilities}\n {description}\n Status: {status}{result_status}\n"
|
|
|
|
# Add decision guidelines
|
|
instructions = """
|
|
Consider the STATUS declarations of agents when making your decision:
|
|
|
|
- [STATUS: ERGEBNIS] - The agent has delivered a complete result
|
|
- [STATUS: TEILWEISE] - The agent has delivered a partial result, more work is needed
|
|
- [STATUS: PLAN] - The agent has delivered a plan, no concrete results yet
|
|
|
|
Possible decisions:
|
|
- Select an agent: Provide a structured response with next_agent and task
|
|
"""
|
|
|
|
if not user_agent_confirmed:
|
|
instructions += """- For completion (only if [STATUS: ERGEBNIS] exists): You MUST FIRST ask the User Agent explicitly if they are satisfied with the results.
|
|
The User Agent MUST respond with "YES" or clear confirmation before the workflow can end!
|
|
|
|
IMPORTANT: You MUST NOT end the workflow before the User Agent has explicitly confirmed with "YES"!
|
|
Ask the User Agent a CLEAR, DIRECT question whether they are satisfied with the result or need more information.
|
|
"""
|
|
else:
|
|
instructions += """- For completion (only if [STATUS: ERGEBNIS] exists and User Agent has confirmed with "YES"): Select "WORKFLOW_COMPLETE" as next_agent to end the workflow.
|
|
|
|
IMPORTANT: Since the User Agent has already given their approval, you can now end the workflow if an agent has delivered a concrete [STATUS: ERGEBNIS]!
|
|
"""
|
|
|
|
if is_user_input_continuation:
|
|
base += "\nIMPORTANT: The User Agent has just responded. Consider this response in your decision."
|
|
|
|
return base + agents_list + instructions
|
|
|
|
|
|
def _parse_structured_agent_selection(self, moderator_text, available_agents):
|
|
"""
|
|
Attempts to parse a structured agent selection from the moderator's text.
|
|
Falls back to keyword detection if structured format is not found.
|
|
"""
|
|
# First try to find a JSON structure
|
|
import re
|
|
import json
|
|
|
|
# Look for JSON structure in the text
|
|
json_pattern = r'\{[\s\S]*?"next_agent"[\s\S]*?\}'
|
|
json_match = re.search(json_pattern, moderator_text)
|
|
|
|
if json_match:
|
|
try:
|
|
selection = json.loads(json_match.group(0))
|
|
next_agent = selection.get("next_agent")
|
|
|
|
# Handle workflow completion
|
|
if next_agent.lower() in ["workflow_complete", "complete", "end"]:
|
|
return "WORKFLOW_COMPLETE"
|
|
|
|
# Check if the selected agent exists
|
|
if next_agent in available_agents:
|
|
logger.info(f"Successfully parsed structured agent selection: {next_agent}")
|
|
return next_agent
|
|
|
|
# Handle user agent selection explicitly
|
|
if next_agent.lower() in ["user", "user_agent", "human"]:
|
|
return "user_agent"
|
|
|
|
except json.JSONDecodeError:
|
|
logger.warning("Failed to parse JSON from moderator text")
|
|
|
|
# Fallback to the traditional logic with improved reliability
|
|
text = moderator_text.lower()
|
|
|
|
# Check for workflow completion phrases
|
|
workflow_complete_phrases = [
|
|
"workflow beenden - vollständiges ergebnis erreicht",
|
|
"workflow beenden - vollständiges ergebnis",
|
|
"vollständiges ergebnis erreicht",
|
|
"workflow complete",
|
|
"end workflow",
|
|
"complete workflow"
|
|
]
|
|
|
|
# Check if the workflow should be completed
|
|
result_exists = False
|
|
for agent_id, agent in available_agents.items():
|
|
if agent.get("used") and agent.get("last_result_status") == "ERGEBNIS":
|
|
result_exists = True
|
|
break
|
|
|
|
# Check if User Agent has confirmed
|
|
user_agent_confirmed = False
|
|
if "user_agent" in available_agents and available_agents["user_agent"].get("used", False):
|
|
for chat_entry in available_agents["user_agent"].get("chat_entries", []):
|
|
if any(confirmation in chat_entry.lower() for confirmation in
|
|
["ja", "yes", "bestätige", "confirm", "stimme zu", "agree"]):
|
|
user_agent_confirmed = True
|
|
break
|
|
|
|
# Check for explicit workflow completion
|
|
if any(phrase in text for phrase in workflow_complete_phrases):
|
|
if result_exists and user_agent_confirmed:
|
|
return "WORKFLOW_COMPLETE"
|
|
elif not result_exists:
|
|
logger.warning("Moderator attempted to end workflow without complete results")
|
|
elif not user_agent_confirmed:
|
|
logger.warning("Moderator attempted to end workflow without User Agent confirmation")
|
|
if "user_agent" in available_agents:
|
|
return "user_agent" # Force User Agent selection to get confirmation
|
|
|
|
# Check for explicit agent selection
|
|
if "ich wähle" in text or "i choose" in text or "i select" in text:
|
|
for agent_id, agent in available_agents.items():
|
|
agent_name_lower = agent["name"].lower()
|
|
if agent_name_lower in text:
|
|
return agent_id
|
|
|
|
# Check for User Agent queries
|
|
user_agent_phrases = [
|
|
"user agent",
|
|
"benutzer",
|
|
"user",
|
|
"human",
|
|
"ask the user",
|
|
"frage den benutzer"
|
|
]
|
|
|
|
# If text contains User Agent phrases and a question, prioritize User Agent
|
|
has_question = "?" in text
|
|
has_user_phrase = any(phrase in text for phrase in user_agent_phrases)
|
|
|
|
if has_question and has_user_phrase:
|
|
if "user_agent" in available_agents:
|
|
return "user_agent"
|
|
|
|
# Direct name matching as last resort
|
|
for agent_id, agent in available_agents.items():
|
|
agent_name_lower = agent["name"].lower()
|
|
if agent_name_lower in text:
|
|
return agent_id
|
|
|
|
# If a complete result exists but User Agent hasn't confirmed, prioritize User Agent
|
|
if result_exists and not user_agent_confirmed and "user_agent" in available_agents:
|
|
return "user_agent"
|
|
|
|
# If no agent explicitly selected, choose first unused agent
|
|
for agent_id, agent in available_agents.items():
|
|
if not agent["used"]:
|
|
return agent_id
|
|
|
|
# Last resort: reuse first agent
|
|
if available_agents:
|
|
return list(available_agents.keys())[0]
|
|
|
|
return None
|
|
|
|
|
|
def _add_log(
|
|
self,
|
|
workflow_id: str,
|
|
message: str,
|
|
log_type: str,
|
|
agent_id: Optional[str] = None,
|
|
agent_name: Optional[str] = None
|
|
) -> None:
|
|
"""Fügt einen Protokolleintrag zum Workflow hinzu"""
|
|
results.add_log(
|
|
self.workflows,
|
|
workflow_id,
|
|
message,
|
|
log_type,
|
|
agent_id,
|
|
agent_name,
|
|
self.mandate_id,
|
|
self.user_id
|
|
)
|
|
|
|
def get_workflow_status(self, workflow_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Gibt den Status eines Workflows zurück"""
|
|
workflow_status = results.get_workflow_status(self.workflows, workflow_id)
|
|
# Füge Datenstatistik hinzu
|
|
if workflow_status and workflow_id in self.workflows:
|
|
workflow_status["data_stats"] = self.workflows[workflow_id].get("data_stats", {
|
|
"sent_bytes": 0,
|
|
"received_bytes": 0
|
|
})
|
|
# Füge das waiting_for_user-Flag explizit hinzu
|
|
workflow_status["waiting_for_user"] = self.workflows[workflow_id].get("waiting_for_user", False)
|
|
return workflow_status
|
|
|
|
|
|
def get_workflow_logs(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]:
|
|
"""Gibt die Protokolle eines Workflows zurück"""
|
|
return results.get_workflow_logs(self.workflows, workflow_id)
|
|
|
|
def get_workflow_results(self, workflow_id: str) -> Optional[List[Dict[str, Any]]]:
|
|
"""Gibt die Ergebnisse eines Workflows zurück"""
|
|
return results.get_workflow_results(self.workflows, workflow_id)
|
|
|
|
async def close(self):
|
|
"""Schließt die HTTP-Clients beim Beenden der Anwendung"""
|
|
await self.service_aichat.close()
|
|
await self.service_aiscrap.close()
|
|
|
|
def stop_workflow(self, workflow_id: str) -> bool:
|
|
"""
|
|
Stoppt einen laufenden Workflow.
|
|
|
|
Args:
|
|
workflow_id: ID des zu stoppenden Workflows
|
|
|
|
Returns:
|
|
bool: True, wenn der Workflow erfolgreich gestoppt wurde, False wenn nicht gefunden
|
|
"""
|
|
logger.info(f"Stoppe Workflow {workflow_id}")
|
|
|
|
if workflow_id not in self.workflows:
|
|
logger.warning(f"Workflow {workflow_id} nicht gefunden")
|
|
return False
|
|
|
|
# Prüfen, ob der Workflow bereits beendet ist
|
|
current_status = self.workflows[workflow_id]["status"]
|
|
if current_status not in ["running", "pending"]:
|
|
logger.info(f"Workflow {workflow_id} ist bereits im Status {current_status}")
|
|
return False
|
|
|
|
# Workflow-Status auf "stopped" setzen
|
|
self.workflows[workflow_id]["status"] = "stopped"
|
|
self.workflows[workflow_id]["progress"] = 1.0 # Korrigierter Zugriff auf den Workflow
|
|
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
|
|
|
|
# Log-Eintrag für das Stoppen hinzufügen
|
|
self._add_log(workflow_id, "Workflow manuell gestoppt", "warning")
|
|
|
|
# Ergebnisse speichern
|
|
results.save_workflow_results(self.workflows, workflow_id, self.results_dir)
|
|
|
|
logger.info(f"Workflow {workflow_id} wurde gestoppt")
|
|
return True
|
|
|
|
async def process_user_input(self, workflow_id: str, message: str, additional_files: List[Dict[str, Any]] = None) -> bool:
|
|
"""
|
|
Enhanced function to process user input for a running workflow.
|
|
Ensures user responses are properly tracked and prevents duplicate messages.
|
|
|
|
Args:
|
|
workflow_id: ID of the workflow
|
|
message: User message
|
|
additional_files: List of additional files (optional)
|
|
|
|
Returns:
|
|
bool: True if input was successfully processed
|
|
"""
|
|
if workflow_id not in self.workflows:
|
|
logger.warning(f"Workflow {workflow_id} not found")
|
|
return False
|
|
|
|
# Check if the workflow is waiting for user input
|
|
if not self.workflows[workflow_id].get("waiting_for_user", False):
|
|
logger.warning(f"Workflow {workflow_id} is not waiting for user input")
|
|
return False
|
|
|
|
logger.info(f"Processing user input for workflow {workflow_id}")
|
|
|
|
# Get user information
|
|
user_info = await self._get_user_info(self.user_id)
|
|
user_name = user_info.get("full_name") or user_info.get("username") or f"User {self.user_id}"
|
|
|
|
# Log entry for user input
|
|
self._add_log(
|
|
workflow_id,
|
|
f"User input received: {message[:50]}{'...' if len(message) > 50 else ''}",
|
|
"info",
|
|
"user_agent",
|
|
user_name
|
|
)
|
|
|
|
# Track user agent response in the agents module
|
|
from modules.agentservice_part_agents import track_user_agent_response
|
|
available_agents = self.workflows[workflow_id].get("available_agents", {})
|
|
track_user_agent_response(available_agents, message)
|
|
|
|
# Process additional files if provided
|
|
additional_context = ""
|
|
if additional_files and len(additional_files) > 0:
|
|
file_contexts = file_handling.prepare_file_contexts(additional_files, self.upload_dir)
|
|
file_contents = file_handling.read_file_contents(
|
|
file_contexts,
|
|
self.upload_dir,
|
|
workflow_id,
|
|
lambda wid, msg, typ, aid=None, aname=None: self._add_log(wid, msg, typ, aid, aname),
|
|
self.service_aichat # Pass AI service for image analysis
|
|
)
|
|
|
|
# Format file context for inclusion
|
|
file_context_text = file_handling.format_file_context_text(file_contexts, file_contents)
|
|
additional_context = f"\n\n### Additional Files:\n{file_context_text}"
|
|
|
|
# Log entry for additional files
|
|
self._add_log(
|
|
workflow_id,
|
|
f"{len(additional_files)} additional files added",
|
|
"info",
|
|
"user_agent",
|
|
user_name
|
|
)
|
|
|
|
# Add new files to existing file contexts and contents
|
|
existing_file_contexts = self.workflows[workflow_id].get("file_contexts", [])
|
|
existing_file_contents = self.workflows[workflow_id].get("file_contents", {})
|
|
|
|
# Store existing IDs to avoid duplicates
|
|
existing_ids = {fc["id"] for fc in existing_file_contexts}
|
|
|
|
# Add new file contexts (only if ID doesn't already exist)
|
|
for file_context in file_contexts:
|
|
if file_context["id"] not in existing_ids:
|
|
existing_file_contexts.append(file_context)
|
|
existing_ids.add(file_context["id"])
|
|
|
|
# Add new file contents
|
|
existing_file_contents.update(file_contents)
|
|
|
|
# Update file contexts and contents in workflow
|
|
self.workflows[workflow_id]["file_contexts"] = existing_file_contexts
|
|
self.workflows[workflow_id]["file_contents"] = existing_file_contents
|
|
|
|
# Combined message (user message + additional files)
|
|
combined_message = message + additional_context
|
|
|
|
# Check for duplicate user messages in chat history
|
|
# This prevents the user's message from appearing twice
|
|
chat_history = self.workflows[workflow_id].get("chat_history", [])
|
|
last_message = chat_history[-1] if chat_history else None
|
|
|
|
# Only add user message if the last message wasn't already from the user with the same content
|
|
is_duplicate = False
|
|
if last_message and last_message.get("role") == "user" and last_message.get("content", "").startswith(f"[User Agent: {user_name}]"):
|
|
# Check for content similarity to avoid duplicates
|
|
last_content = last_message.get("content", "").replace(f"[User Agent: {user_name}] ", "", 1)
|
|
if last_content == message:
|
|
is_duplicate = True
|
|
logger.warning(f"Detected duplicate user message, skipping addition to chat history")
|
|
|
|
# Add user message to chat history if not duplicate
|
|
if not is_duplicate:
|
|
chat_history.append({
|
|
"role": "user",
|
|
"content": f"[User Agent: {user_name}] {message}"
|
|
})
|
|
|
|
# Update chat history in workflow
|
|
self.workflows[workflow_id]["chat_history"] = chat_history
|
|
|
|
# Estimate sent bytes
|
|
message_size = len(combined_message)
|
|
self.workflows[workflow_id]["data_stats"]["sent_bytes"] += message_size
|
|
|
|
# Save the current state before continuing
|
|
results.save_workflow_results(self.workflows, workflow_id, self.results_dir)
|
|
|
|
# Start a new task to continue the workflow
|
|
asyncio.create_task(
|
|
self._continue_workflow_after_user_input(
|
|
workflow_id,
|
|
combined_message,
|
|
user_name
|
|
)
|
|
)
|
|
|
|
return True
|
|
|
|
|
|
|
|
async def _continue_workflow_after_user_input(self, workflow_id: str, user_message: str, user_name: str) -> None:
|
|
"""
|
|
Enhanced function to continue a workflow after user input.
|
|
Ensures proper state tracking and prevents duplicate processing.
|
|
|
|
Args:
|
|
workflow_id: ID of the workflow
|
|
user_message: User message
|
|
user_name: User name
|
|
"""
|
|
if workflow_id not in self.workflows:
|
|
logger.warning(f"Workflow {workflow_id} not found")
|
|
return
|
|
|
|
# Update workflow status
|
|
self.workflows[workflow_id]["status"] = "running"
|
|
|
|
# Log entry for continuation
|
|
self._add_log(
|
|
workflow_id,
|
|
"Workflow continues after user input",
|
|
"info"
|
|
)
|
|
|
|
# Get required data from workflow
|
|
chat_history = self.workflows[workflow_id].get("chat_history", [])
|
|
available_agents = self.workflows[workflow_id].get("available_agents", {})
|
|
file_contexts = self.workflows[workflow_id].get("file_contexts", [])
|
|
file_contents = self.workflows[workflow_id].get("file_contents", {})
|
|
|
|
# Create user result (store agent result in the workflow)
|
|
user_result = results.create_agent_result(
|
|
workflow_id,
|
|
{"id": "user_agent", "name": "User Agent", "type": "user"},
|
|
len(self.workflows[workflow_id].get("results", [])),
|
|
"User input",
|
|
file_contexts,
|
|
user_message,
|
|
self.mandate_id,
|
|
self.user_id
|
|
)
|
|
|
|
self.workflows[workflow_id]["results"].append(user_result)
|
|
|
|
# Mark User Agent as used
|
|
if "user_agent" in available_agents:
|
|
available_agents["user_agent"]["used"] = True
|
|
|
|
# Mark that we're no longer waiting for user input - CRITICAL
|
|
self.workflows[workflow_id]["waiting_for_user"] = False
|
|
|
|
# Get current round and max rounds
|
|
current_round = self.workflows[workflow_id].get("current_round", 0)
|
|
max_rounds = 12 # Same value as in execute_workflow
|
|
|
|
# Check if we've reached the maximum number of rounds
|
|
if current_round >= max_rounds:
|
|
self._add_log(workflow_id, f"Workflow automatically ended after {max_rounds} rounds and user input", "info")
|
|
self.workflows[workflow_id]["status"] = "completed"
|
|
self.workflows[workflow_id]["progress"] = 1.0
|
|
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
|
|
|
|
# Save results
|
|
results.save_workflow_results(self.workflows, workflow_id, self.results_dir)
|
|
return
|
|
|
|
# Start next round
|
|
current_round += 1
|
|
self.workflows[workflow_id]["current_round"] = current_round
|
|
self._add_log(workflow_id, f"Starting round {current_round} after user input", "info")
|
|
|
|
# Run a moderator cycle
|
|
workflow_complete, waiting_for_user_input, selected_agent_id = await self._run_moderator_cycle(
|
|
workflow_id,
|
|
chat_history,
|
|
available_agents,
|
|
file_contexts,
|
|
file_contents,
|
|
is_user_input_continuation=True,
|
|
user_message=user_message
|
|
)
|
|
|
|
# Update workflow state based on moderator cycle result
|
|
if workflow_complete:
|
|
self.workflows[workflow_id]["status"] = "completed"
|
|
self._add_log(workflow_id, "Workflow successfully completed after user input", "success")
|
|
self.workflows[workflow_id]["progress"] = 1.0
|
|
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
|
|
elif waiting_for_user_input:
|
|
# User Agent was selected again, we're waiting for more input
|
|
pass
|
|
elif current_round < max_rounds:
|
|
# Continue with more agent rounds if not waiting for input
|
|
asyncio.create_task(
|
|
self._continue_workflow_after_moderator_cycle(
|
|
workflow_id,
|
|
selected_agent_id
|
|
)
|
|
)
|
|
|
|
# Update progress
|
|
if not self.workflows[workflow_id]["completed_at"]:
|
|
self.workflows[workflow_id]["progress"] = min(0.9, 0.1 + (current_round / max_rounds) * 0.8)
|
|
|
|
# Save results
|
|
results.save_workflow_results(self.workflows, workflow_id, self.results_dir)
|
|
|
|
|
|
async def _continue_workflow_after_moderator_cycle(self, workflow_id: str, selected_agent_id: str) -> None:
|
|
"""
|
|
New method to continue workflow after a moderator cycle when not waiting for user input.
|
|
This facilitates smoother agent transitions and addresses the issue of the workflow not continuing properly.
|
|
|
|
Args:
|
|
workflow_id: ID of the workflow
|
|
selected_agent_id: ID of the selected agent
|
|
"""
|
|
if workflow_id not in self.workflows:
|
|
logger.warning(f"Workflow {workflow_id} not found")
|
|
return
|
|
|
|
# Get required data from workflow
|
|
chat_history = self.workflows[workflow_id].get("chat_history", [])
|
|
available_agents = self.workflows[workflow_id].get("available_agents", {})
|
|
file_contexts = self.workflows[workflow_id].get("file_contexts", [])
|
|
file_contents = self.workflows[workflow_id].get("file_contents", {})
|
|
|
|
# Get current round and max rounds
|
|
current_round = self.workflows[workflow_id].get("current_round", 0)
|
|
max_rounds = 12 # Same value as in execute_workflow
|
|
|
|
# Check if we've reached the maximum number of rounds
|
|
if current_round >= max_rounds:
|
|
self._add_log(workflow_id, f"Workflow automatically ended after {max_rounds} rounds", "info")
|
|
self.workflows[workflow_id]["status"] = "completed"
|
|
self.workflows[workflow_id]["progress"] = 1.0
|
|
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
|
|
|
|
# Save results
|
|
results.save_workflow_results(self.workflows, workflow_id, self.results_dir)
|
|
return
|
|
|
|
# Start next round
|
|
current_round += 1
|
|
self.workflows[workflow_id]["current_round"] = current_round
|
|
self._add_log(workflow_id, f"Starting round {current_round} (continuation)", "info")
|
|
|
|
# Run another moderator cycle
|
|
workflow_complete, waiting_for_user_input, new_selected_agent_id = await self._run_moderator_cycle(
|
|
workflow_id,
|
|
chat_history,
|
|
available_agents,
|
|
file_contexts,
|
|
file_contents,
|
|
is_user_input_continuation=False
|
|
)
|
|
|
|
# Update workflow state based on moderator cycle result
|
|
if workflow_complete:
|
|
self.workflows[workflow_id]["status"] = "completed"
|
|
self._add_log(workflow_id, "Workflow successfully completed", "success")
|
|
self.workflows[workflow_id]["progress"] = 1.0
|
|
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
|
|
elif waiting_for_user_input:
|
|
# User Agent was selected, we're waiting for input
|
|
pass
|
|
elif current_round < max_rounds:
|
|
# Continue with more agent rounds if not waiting for input and not at max rounds
|
|
asyncio.create_task(
|
|
self._continue_workflow_after_moderator_cycle(
|
|
workflow_id,
|
|
new_selected_agent_id
|
|
)
|
|
)
|
|
else:
|
|
# Max rounds reached
|
|
self.workflows[workflow_id]["status"] = "completed"
|
|
self._add_log(workflow_id, f"Workflow completed after reaching maximum rounds ({max_rounds})", "info")
|
|
self.workflows[workflow_id]["progress"] = 1.0
|
|
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
|
|
|
|
# Update progress
|
|
if not self.workflows[workflow_id]["completed_at"]:
|
|
self.workflows[workflow_id]["progress"] = min(0.9, 0.1 + (current_round / max_rounds) * 0.8)
|
|
|
|
# Save results
|
|
results.save_workflow_results(self.workflows, workflow_id, self.results_dir)
|
|
|
|
|
|
async def _get_user_info(self, user_id: int) -> Dict[str, Any]:
|
|
"""
|
|
Ruft Benutzerinformationen aus der Datenbank ab.
|
|
|
|
Args:
|
|
user_id: ID des Benutzers
|
|
|
|
Returns:
|
|
Dict mit Benutzerinformationen
|
|
"""
|
|
try:
|
|
# Hier würden wir normalerweise die Benutzerinformationen aus einer Datenbank abrufen
|
|
# Für diese Implementierung verwenden wir Platzhalter
|
|
return {
|
|
"id": user_id,
|
|
"username": f"user_{user_id}",
|
|
"full_name": f"Benutzer {user_id}"
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Abrufen der Benutzerinformationen: {str(e)}")
|
|
return {"id": user_id}
|
|
|
|
async def _sanitize_messages(self, messages):
|
|
"""Sanitizes all messages to prevent API errors."""
|
|
if not messages:
|
|
return messages
|
|
|
|
sanitized = []
|
|
for message in messages:
|
|
# Create a deep copy of the message
|
|
sanitized_message = message.copy() if isinstance(message, dict) else message
|
|
|
|
if isinstance(sanitized_message, dict) and "content" in sanitized_message:
|
|
sanitized_message["content"] = self._sanitize_message_content(sanitized_message["content"])
|
|
|
|
sanitized.append(sanitized_message)
|
|
|
|
return sanitized
|
|
|
|
def _sanitize_message_content(self, content):
|
|
"""Ensures content has no trailing whitespace."""
|
|
if isinstance(content, str):
|
|
return content.rstrip()
|
|
|
|
# If content is a list (for multimodal messages), process each item
|
|
if isinstance(content, list):
|
|
return [
|
|
{**item, 'text': item['text'].rstrip() if isinstance(item.get('text'), str) else item.get('text')}
|
|
if isinstance(item, dict) and item.get('type') == 'text'
|
|
else item
|
|
for item in content
|
|
]
|
|
|
|
return content
|
|
|
|
def _check_for_user_agent_query(self, message_content: str) -> bool:
|
|
"""
|
|
Prüft, ob ein Text eine Anfrage an den User-Agent enthält.
|
|
|
|
Args:
|
|
message_content: Der zu prüfende Text
|
|
|
|
Returns:
|
|
bool: True, wenn der Text eine Anfrage an den User-Agent enthält
|
|
"""
|
|
# Keine Prüfung, wenn der Text leer ist
|
|
if not message_content:
|
|
return False
|
|
|
|
# Prüfmuster für User-Agent-Anfragen
|
|
user_agent_phrases = [
|
|
'User Agent',
|
|
'Benutzer',
|
|
'Was denken Sie',
|
|
'Was denkst du',
|
|
'Ihre Meinung',
|
|
'deine Meinung',
|
|
'Sind Sie zufrieden',
|
|
'Gibt es weitere Fragen',
|
|
'Wollen Sie',
|
|
'Möchten Sie',
|
|
'Kannst du',
|
|
'Können Sie',
|
|
'Könnten Sie',
|
|
'Bitte teilen Sie uns mit',
|
|
'Moderator zu User Agent'
|
|
]
|
|
|
|
# Text in Kleinbuchstaben umwandeln für case-insensitive Suche
|
|
content_lower = message_content.lower()
|
|
|
|
# Prüfen, ob ein Fragezeichen enthalten ist
|
|
has_question_mark = '?' in message_content
|
|
|
|
# Prüfen, ob eine der Phrasen enthalten ist
|
|
has_user_agent_phrase = any(phrase.lower() in content_lower for phrase in user_agent_phrases)
|
|
|
|
# Spezielle Bedingungen für User-Agent-Anfragen
|
|
is_user_agent_query = (
|
|
(has_question_mark and has_user_agent_phrase) or
|
|
('user agent' in content_lower and 'workflow' in content_lower) or
|
|
'moderator zu user agent' in content_lower
|
|
)
|
|
return is_user_agent_query
|
|
|
|
|
|
|
|
|
|
# Singleton-Factory für AgentService-Instanzen pro Kontext
|
|
_agent_service_instances = {}
|
|
|
|
def get_agentservice_interface(mandate_id: int = None, user_id: int = None) -> AgentService:
|
|
"""
|
|
Gibt eine AgentService-Instanz für den angegebenen Kontext zurück.
|
|
Wiederverwendet bestehende Instanzen.
|
|
"""
|
|
context_key = f"{mandate_id}_{user_id}"
|
|
if context_key not in _agent_service_instances:
|
|
_agent_service_instances[context_key] = AgentService(mandate_id, user_id)
|
|
return _agent_service_instances[context_key]
|
|
|