gateway/gwserver/modules/agentservice_part_results.py

214 lines
No EOL
6.9 KiB
Python

import os
import json
import logging
import uuid
from datetime import datetime
from typing import Dict, Any, List, Optional
# Logger konfigurieren
logger = logging.getLogger(__name__)
def create_agent_result(
workflow_id: str,
agent: Dict[str, Any],
index: int,
prompt: str,
file_contexts: List[Dict[str, Any]],
content: str,
mandate_id: int = None,
user_id: int = None
) -> Dict[str, Any]:
"""
Erstellt ein Ergebnisobjekt basierend auf dem Agententyp und der API-Antwort.
Diese Version berücksichtigt die Status-Deklaration des Agenten.
Args:
workflow_id: ID des Workflows
agent: Agent-Informationen
index: Index des Ergebnisses
prompt: Ursprünglicher Prompt
file_contexts: Liste der Dateikontexte
content: Inhalt der Agent-Antwort
mandate_id: Optional ID des Mandanten
user_id: Optional ID des Benutzers
Returns:
Ein Ergebnisobjekt für den Workflow
"""
# Importiere die Hilfsfunktionen aus dem agents-Modul
from modules.agentservice_part_agents import update_agent_results_with_status, extract_summary
agent_type = agent["type"]
agent_id = agent["id"]
agent_name = agent["name"]
# Extrahiere den Status aus dem Ergebnis
cleaned_content, result_status = update_agent_results_with_status(content)
# Speichere den Status im Agenten für späteren Zugriff
agent["last_result_status"] = result_status
# Entferne Agent-Präfix aus der Antwort, falls vorhanden
agent_prefix = f"[Agent: {agent_name}]"
if cleaned_content.startswith(agent_prefix):
cleaned_content = cleaned_content[len(agent_prefix):].strip()
# Titel basierend auf Agent-Type und deklariertem Status erstellen
title_prefix = {
"ERGEBNIS": "Ergebnis",
"TEILWEISE": "Teilweises Ergebnis",
"PLAN": "Vorgeschlagener Plan",
"UNBEKANNT": "Antwort"
}.get(result_status, "Antwort")
# Titel basierend auf Agent-Attributen erstellen
agent_desc = agent.get("description", f"Agent {agent_name}")
title = f"{title_prefix}: {agent_desc}"
# Extrahiere Dateinamen für die Metadaten
file_names = [file["name"] for file in file_contexts]
# Grundlegende Ergebnisstruktur
result = {
"id": f"result_{workflow_id}_{index}",
"mandate_id": mandate_id,
"user_id": user_id,
"agent_id": agent_id,
"agent_name": agent_name,
"timestamp": datetime.now().isoformat(),
"type": "text", # Standardtyp
"title": title,
"content": cleaned_content,
"summary": extract_summary(cleaned_content, max_length=200),
"metadata": {
"files_processed": file_names,
"prompt": prompt,
"agent_type": agent_type,
"result_status": result_status,
"capabilities": agent.get("capabilities", "")
}
}
return result
def add_log(
workflows: Dict[str, Dict[str, Any]],
workflow_id: str,
message: str,
log_type: str,
agent_id: Optional[str] = None,
agent_name: Optional[str] = None,
mandate_id: int = None,
user_id: int = None
) -> None:
"""
Fügt einen Protokolleintrag zum Workflow hinzu
Args:
workflows: Dictionary mit Workflow-Informationen
workflow_id: ID des Workflows
message: Log-Nachricht
log_type: Typ des Logs (info, warning, error, etc.)
agent_id: Optional ID des Agenten
agent_name: Optional Name des Agenten
mandate_id: Optional ID des Mandanten
user_id: Optional ID des Benutzers
"""
log_entry = {
"id": f"log_{uuid.uuid4()}",
"mandate_id": mandate_id,
"user_id": user_id,
"message": message,
"type": log_type,
"timestamp": datetime.now().isoformat(),
"agent_id": agent_id,
"agent_name": agent_name
}
workflow = workflows.get(workflow_id)
if workflow:
workflow["logs"].append(log_entry)
logger.info(f"Workflow {workflow_id}: {message}")
else:
logger.warning(f"Versuch, einem nicht existierenden Workflow Logs hinzuzufügen: {workflow_id}")
def save_workflow_results(workflows: Dict[str, Dict[str, Any]], workflow_id: str, results_dir: str) -> None:
"""
Speichert die Workflow-Ergebnisse in einer Datei
Args:
workflows: Dictionary mit Workflow-Informationen
workflow_id: ID des Workflows
results_dir: Verzeichnis für Ergebnisse
"""
workflow = workflows.get(workflow_id)
if workflow:
try:
file_path = os.path.join(results_dir, f"workflow_{workflow_id}.json")
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(workflow, f, indent=2, ensure_ascii=False)
logger.info(f"Workflow-Ergebnisse gespeichert: {file_path}")
except Exception as e:
logger.error(f"Fehler beim Speichern der Workflow-Ergebnisse: {e}")
else:
logger.error(f"Workflow nicht gefunden: {workflow_id}")
def get_workflow_status(workflows: Dict[str, Dict[str, Any]], workflow_id: str) -> Optional[Dict[str, Any]]:
"""
Gibt den Status eines Workflows zurück
Args:
workflows: Dictionary mit Workflow-Informationen
workflow_id: ID des Workflows
Returns:
Ein Dictionary mit Statusinformationen oder None, wenn der Workflow nicht existiert
"""
workflow = workflows.get(workflow_id)
if not workflow:
return None
return {
"id": workflow["id"],
"mandate_id": workflow.get("mandate_id"),
"user_id": workflow.get("user_id"),
"status": workflow["status"],
"progress": workflow["progress"],
"started_at": workflow["started_at"],
"completed_at": workflow["completed_at"],
"agent_statuses": workflow["agent_statuses"]
}
def get_workflow_logs(workflows: Dict[str, Dict[str, Any]], workflow_id: str) -> Optional[List[Dict[str, Any]]]:
"""
Gibt die Protokolle eines Workflows zurück
Args:
workflows: Dictionary mit Workflow-Informationen
workflow_id: ID des Workflows
Returns:
Eine Liste mit Logs oder None, wenn der Workflow nicht existiert
"""
workflow = workflows.get(workflow_id)
if not workflow:
return None
return workflow["logs"]
def get_workflow_results(workflows: Dict[str, Dict[str, Any]], workflow_id: str) -> Optional[List[Dict[str, Any]]]:
"""
Gibt die Ergebnisse eines Workflows zurück
Args:
workflows: Dictionary mit Workflow-Informationen
workflow_id: ID des Workflows
Returns:
Eine Liste mit Ergebnissen oder None, wenn der Workflow nicht existiert
"""
workflow = workflows.get(workflow_id)
if not workflow:
return None
return workflow["results"]