img extraction direct and from pdf in all modes

This commit is contained in:
valueon 2025-03-25 21:23:21 +01:00
parent d7cea0398f
commit 78c583d500
11 changed files with 617 additions and 661 deletions

View file

@ -13,12 +13,23 @@
{
"mandate_id": 1,
"user_id": 1,
"name": "test.pdf",
"name": "test_bild.pdf",
"type": "document",
"content_type": "application/pdf",
"size": 299729,
"path": "./_uploads\\c30faecc-c041-4225-805f-741328a04bba.pdf",
"upload_date": "2025-03-24T16:58:50.089708",
"path": "./_uploads\\b73afc5a-131a-4db8-9572-be2a00c71d91.pdf",
"upload_date": "2025-03-25T17:42:03.548097",
"id": 2
},
{
"mandate_id": 1,
"user_id": 1,
"name": "20240419_093309.jpg",
"type": "image",
"content_type": "image/jpeg",
"size": 286163,
"path": "./_uploads\\46a65f6f-a30c-4cf2-b0e2-c24709c3282e.jpg",
"upload_date": "2025-03-25T18:52:16.203894",
"id": 3
}
]

View file

@ -29,7 +29,7 @@ import modules.gateway_model as gateway_model
# Konfiguration des Loggers
logging.basicConfig(
level=logging.INFO,
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.StreamHandler()]
)

View file

@ -24,7 +24,7 @@ DEBUG = True
UPLOAD_DIR = ./_uploads
RESULTS_DIR = ./_results
MAX_HISTORY = 50
AI_PROVIDER = anthropic ; openai oder anthropic
AI_PROVIDER = openai ; openai oder anthropic
[Connector_AiOpenai]
API_KEY = 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

View file

@ -1,5 +1,7 @@
import io
import os
import json
import uuid
import logging
import httpx
import base64
@ -7,7 +9,10 @@ import mimetypes
from typing import Dict, Any, List, Optional
from fastapi import HTTPException
import configload as configload
import pandas as pd
from PIL import Image
import PyPDF2
import fitz
# Logger konfigurieren
logger = logging.getLogger(__name__)
@ -91,72 +96,173 @@ class ChatService:
logger.error(f"Fehler beim Aufruf der OpenAI API: {str(e)}")
raise HTTPException(status_code=500, detail=f"Fehler beim Aufruf der OpenAI API: {str(e)}")
def prepare_file_message_content(self, prompt_text: str, file_paths: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
async def close(self):
"""Schließt den HTTP-Client beim Beenden der Anwendung"""
await self.http_client.close()
async def analyze_image(self, image_path: str, prompt: str) -> str:
"""
Bereitet eine Nachricht mit Dateien für OpenAI API vor.
Analysiert ein Bild mit der OpenAI Vision API.
Args:
prompt_text: Der Text-Prompt
file_paths: Liste von Dateipfaden mit Metadaten (Dict mit id, name, type, path)
image_path: Pfad zum Bild
prompt: Der Prompt für die Analyse
Returns:
Eine für OpenAI-API formatierte content-Liste
Die Antwort der OpenAI Vision API als Text
"""
message_content = [
{
"type": "text",
"text": prompt_text
}
]
# Füge Dateien als Base64-Anhänge hinzu
for file_info in file_paths:
file_path = file_info.get("path", "")
if file_path and os.path.exists(file_path):
try:
# Datei als Base64 codieren
with open(file_path, "rb") as f:
file_data = f.read()
base64_data = base64.b64encode(file_data).decode('utf-8')
# MIME-Typ bestimmen
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
mime_type = "application/octet-stream"
# Bei OpenAI werden Bilder anders behandelt als bei Anthropic
if mime_type.startswith("image/"):
message_content.append({
try:
# Bild einlesen und als Base64 kodieren
with open(image_path, "rb") as f:
image_data = f.read()
# In Base64-String konvertieren
base64_data = base64.b64encode(image_data).decode('utf-8')
# MIME-Typ bestimmen
mime_type, _ = mimetypes.guess_type(image_path)
if not mime_type:
# Standard ist PNG, wenn Typ nicht bestimmt werden kann
mime_type = "image/png"
# Vision-fähiges Modell für die Anfrage verwenden
vision_model = "gpt-4o" # oder aus der Konfiguration holen
# Bereite den Payload für die Vision API vor
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{base64_data}"
}
})
else:
# Für nicht-Bilder werden sie als Textbeschreibung hinzugefügt
message_content.append({
"type": "text",
"text": f"[Datei: {file_info.get('name', 'Unbekannt')} (Typ: {mime_type})]"
})
logger.info(f"Datei {file_info.get('name', 'Unbekannt')} als Anhang hinzugefügt")
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Datei {file_info.get('name', 'Unbekannt')}: {str(e)}")
}
]
}
]
# Verwende die bestehende call_api Funktion mit dem Vision-Modell
response = await self.call_api(messages)
# Inhalt extrahieren und zurückgeben
return response["choices"][0]["message"]["content"]
return message_content
def parse_filedata(self, file_paths: List[Dict[str, Any]], prompt_text: str = "", file_contents: Dict[str, str] = None) -> Dict[str, Any]:
except Exception as e:
logger.error(f"Fehler bei der Bildanalyse {image_path}: {str(e)}", exc_info=True)
return f"[Fehler bei der Bildanalyse: {str(e)}]"
async def extract_and_analyze_pdf_images(self, pdf_path: str, prompt: str) -> List[Dict[str, Any]]:
"""
Bereitet Dateien für die OpenAI API vor und erstellt ein einheitliches Message-Objekt.
Extrahiert Bilder aus einer PDF-Datei mit PyMuPDF und analysiert sie mit der Vision API.
Args:
file_paths: Liste von Dateipfaden mit Metadaten (Dict mit id, name, type, path)
prompt_text: Der Text-Prompt, der zusammen mit den Dateien gesendet werden soll
file_contents: Optional vorgelesene Dateiinhalte als Dict[file_id, content]
pdf_path: Pfad zur PDF-Datei
prompt: Der Prompt für die Bildanalyse
Returns:
Ein standardisiertes Message-Objekt, das für beide API-Typen verwendet werden kann
Eine Liste mit Analyseergebnissen für jedes Bild
"""
import uuid
import os
image_responses = []
try:
# PDF öffnen
doc = fitz.open(pdf_path)
logger.info(f"PDF geöffnet: {pdf_path} mit {len(doc)} Seiten")
for page_num, page in enumerate(doc, 1):
# Alle Bilder auf der Seite finden
image_list = page.get_images(full=True)
if image_list:
logger.info(f"Seite {page_num}: {len(image_list)} Bilder gefunden")
for img_index, img in enumerate(image_list):
try:
# Bild-Referenz
xref = img[0]
# Bild und Metadaten extrahieren
base_image = doc.extract_image(xref)
image_bytes = base_image["image"] # Tatsächliche Bilddaten
image_ext = base_image["ext"] # Dateiendung (jpg, png, etc.)
# Speichere als temporäre Datei
unique_id = uuid.uuid4()
temp_img_path = f"temp_img_{page_num}_{img_index}_{unique_id}.{image_ext}"
with open(temp_img_path, "wb") as img_file:
img_file.write(image_bytes)
logger.debug(f"Bild temporär gespeichert: {temp_img_path}")
# Analysiere mit Vision API
try:
analysis_result = await self.analyze_image(temp_img_path, prompt)
logger.debug(f"Bildanalyse für Bild {img_index} auf Seite {page_num} abgeschlossen")
except Exception as analyze_error:
logger.error(f"Fehler bei der Bildanalyse: {str(analyze_error)}")
analysis_result = f"[Fehler bei der Bildanalyse: {str(analyze_error)}]"
# Ergebnis speichern
try:
# Versuche zuerst, die Größe aus base_image zu bekommen
if 'width' in base_image and 'height' in base_image:
image_size = f"{base_image['width']}x{base_image['height']}"
else:
# Alternative: Öffne das temporäre Bild, um die Größe zu bestimmen
with Image.open(temp_img_path) as img:
width, height = img.size
image_size = f"{width}x{height}"
except Exception as e:
logger.warning(f"Konnte Bildgröße nicht ermitteln: {str(e)}")
image_size = "unbekannt"
image_responses.append({
"page": page_num,
"image_index": img_index,
"format": image_ext,
"image_size": image_size, # Der key wird immer gesetzt, entweder mit tatsächlicher Größe oder "unbekannt"
"response": analysis_result
})
# Temporäre Datei löschen
try:
if os.path.exists(temp_img_path):
os.remove(temp_img_path)
logger.debug(f"Temporäre Bilddatei entfernt: {temp_img_path}")
except Exception as cleanup_error:
logger.warning(f"Temporäre Datei konnte nicht entfernt werden: {cleanup_error}")
except Exception as e:
logger.warning(f"Fehler bei der Extraktion von Bild {img_index} auf Seite {page_num}: {str(e)}")
continue
logger.info(f"Extrahiert und analysiert: {len(image_responses)} Bilder aus PDF {os.path.basename(pdf_path)}")
except ImportError:
logger.error("PyMuPDF (fitz) ist nicht installiert. Installiere es mit 'pip install pymupdf'")
except Exception as e:
logger.error(f"Fehler beim Extrahieren von PDF-Bildern: {str(e)}")
return image_responses
async def parse_filedata(self, file_contexts: List[Dict[str, Any]], prompt_text: str, file_contents: Dict[str, str] = None) -> Dict[str, Any]:
"""
Bereitet eine vollständige Nachricht mit allen Dateiinhalten für das AI-Modell vor.
Args:
file_contexts: Liste der Dateikontexte mit Metadaten
prompt_text: Der Text-Prompt
file_contents: Dictionary mit bereits geladenen Dateiinhalten
Returns:
Eine vollständig formatierte Nachricht für das AI-Modell
"""
# Basisstruktur für die Nachricht in OpenAI-Format
message = {
@ -171,101 +277,116 @@ class ChatService:
"text": prompt_text
})
if not file_contents:
file_contents = {}
# Dateien als Anhänge hinzufügen
for file_info in file_paths:
for file_info in file_contexts:
file_path = file_info.get("path", "")
file_name = file_info.get("name", "")
file_id = file_info.get("id", "")
file_type = file_info.get("type", "")
# Prüfen, ob Dateiinhalt bereits vorhanden ist
if file_contents and file_id in file_contents:
# Bereits verarbeiteten Inhalt verwenden
if file_id in file_contents:
content = file_contents[file_id]
# Problematische Unicode-Zeichen ersetzen
if isinstance(content, str):
content = content.encode('utf-8', errors='replace').decode('utf-8')
else:
content = str(content)
# Besondere Verarbeitung für PDF-Dateien mit Bildern
if file_name.endswith('.pdf') and file_path and os.path.exists(file_path):
try:
# Bildanalyse der PDF durchführen
image_prompt = prompt_text or "Beschreibe detailliert, was du in diesem Bild siehst."
image_responses = await self.extract_and_analyze_pdf_images(file_path, image_prompt)
# Nur wenn Bilder gefunden wurden, füge die Analyse hinzu
if image_responses:
image_details = "\n\n".join([
f"Bild auf Seite {resp['page']} (Größe: {resp['image_size']}): {resp['response']}"
for resp in image_responses
])
logger.debug("Image description: "+image_details)
message["content"].append({
"type": "text",
"text": f"PDF-Bildanalyse für {file_name}:\n{image_details}"
})
except Exception as e:
logger.error(f"Fehler bei der PDF-Bildanalyse für {file_name}: {str(e)}")
# Text zur Nachricht hinzufügen
message["content"].append({
"type": "text",
"text": f"Datei: {file_name}\n{file_contents[file_id]}"
"text": f"--- DATEI: {file_name} ---\n\n{content}"
})
logger.info(f"Vorverarbeiteter Inhalt für Datei {file_name} verwendet")
logger.info(f"Inhalt für Datei {file_name} zur Nachricht hinzugefügt")
continue
# Sonst Datei direkt verarbeiten
if file_path and os.path.exists(file_path):
# Wenn kein Inhalt vorhanden ist, füge einen Hinweis hinzu
if not file_path or not os.path.exists(file_path):
message["content"].append({
"type": "text",
"text": f"[Datei {file_name} wurde nicht gefunden oder ist nicht zugänglich]"
})
logger.warning(f"Datei {file_name} nicht gefunden: {file_path}")
continue
# Direktes Hinzufügen von Bildern bei OpenAI (für Bilder außerhalb von PDFs)
if file_type == "image" or file_name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')):
try:
# Datei einlesen
# Bild als Base64 kodieren
with open(file_path, "rb") as f:
file_data = f.read()
# MIME-Typ bestimmen mit python-magic, wenn verfügbar
try:
import magic
mime_type = magic.from_buffer(file_data, mime=True)
except ImportError:
# Fallback auf mimetypes, wenn python-magic nicht verfügbar ist
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
# Fallback auf Dateierweiterung
extension = os.path.splitext(file_path)[1].lower()[1:]
mime_type = get_mime_type_from_extension(extension)
# Content-Type bestimmen für OpenAI
if mime_type.startswith("image/"):
# Bild als Base64 kodieren
base64_data = base64.b64encode(file_data).decode('utf-8')
# MIME-Typ bestimmen
mime_type, _ = mimetypes.guess_type(file_path)
if not mime_type:
mime_type = "application/octet-stream"
# Bild zur Nachricht hinzufügen
if mime_type.startswith("image/"):
# Füge zunächst die Bildanalyse als Text hinzu
image_prompt = prompt_text or "Beschreibe detailliert, was du in diesem Bild siehst."
analysis_result = await self.analyze_image(file_path, image_prompt)
message["content"].append({
"type": "text",
"text": f"Bildanalyse für {file_name}:\n{analysis_result}"
})
# Dann füge das Bild selbst hinzu
message["content"].append({
"type": "image_url",
"image_url": {
"url": f"data:{mime_type};base64,{base64_data}"
}
})
else:
# Für nicht-Bild-Dateien als Text hinzufügen
try:
# Textdateien direkt als Text extrahieren
if mime_type in ["text/plain", "text/csv", "application/json", "text/html", "application/xml"]:
message["content"].append({
"type": "text",
"text": f"[Datei: {file_name}]\n{file_data.decode('utf-8', errors='replace')}"
})
else:
# Bei Binärdateien nur einen Hinweis einfügen
message["content"].append({
"type": "text",
"text": f"[Datei: {file_name} (Typ: {mime_type}) ist verfügbar, aber kann nicht direkt angezeigt werden]"
})
except UnicodeDecodeError:
# Bei Decodierungsfehlern nur einen Hinweis einfügen
message["content"].append({
"type": "text",
"text": f"[Datei: {file_name} (Typ: {mime_type}) ist binär und kann nicht direkt angezeigt werden]"
})
logger.info(f"Datei {file_name} zum OpenAI-Nachrichtenobjekt hinzugefügt")
logger.info(f"Bild {file_name} analysiert und zur Nachricht hinzugefügt")
except Exception as e:
logger.error(f"Fehler beim Hinzufügen der Datei {file_name}: {str(e)}")
logger.error(f"Fehler beim Hinzufügen des Bildes {file_name}: {str(e)}")
message["content"].append({
"type": "text",
"text": f"[Fehler beim Laden der Datei {file_name}: {str(e)}]"
"text": f"[Fehler beim Laden des Bildes {file_name}: {str(e)}]"
})
else:
# Datei nicht gefunden - Hinweis einfügen
message["content"].append({
"type": "text",
"text": f"[Datei {file_name} nicht verfügbar]"
})
# Prüfe, ob wir ein leeres content-Array haben und wandle es in einen String um
# Optimiere das Message-Format für die API basierend auf dem Inhalt
if not message["content"]:
# Leerer Inhalt - setze auf leeren String
message["content"] = prompt_text or ""
elif len(message["content"]) == 1 and message["content"][0]["type"] == "text":
# Wenn nur ein Text-Element vorhanden ist, vereinfachen wir die Struktur
# Nur ein Text-Element - vereinfache die Struktur
message["content"] = message["content"][0]["text"]
# Bei komplizierten Inhalten (Mischung aus Text und Bildern) wird das Array-Format beibehalten
logger.debug(f"Message-Objekt für OpenAI erstellt mit {len(message['content']) if isinstance(message['content'], list) else 'String'}-Inhalt")
return message
async def close(self):
"""Schließt den HTTP-Client beim Beenden der Anwendung"""
await self.http_client.aclose()
def get_mime_type_from_extension(extension: str) -> str:
"""

View file

@ -9,6 +9,7 @@ import configload as configload
import urllib.parse
import time
import random
import pandas as pd
# Logger konfigurieren
logger = logging.getLogger(__name__)

View file

@ -101,8 +101,8 @@ class AgentService:
Anstatt die Agenten der Reihe nach abzuarbeiten, wird ein AI-Moderator verwendet,
der die Agenten basierend auf ihren Fähigkeiten und bisherigen Antworten steuert.
Verwendet parse_filedata aus dem jeweiligen Connector für die Verarbeitung von Dateien,
wobei vorverarbeitete Dateiinhalte effizient wiederverwendet werden.
Verwendet file_handling.prepare_message_for_ai für die Vorbereitung der Nachrichten,
wobei alle Dateiinhalte vollständig gelesen werden.
"""
logger.info(f"Starte Workflow {workflow_id} mit {len(agents_list)} Agenten und {len(files)} Dateien")
@ -126,7 +126,9 @@ class AgentService:
# Dateikontexte vorbereiten
file_contexts = file_handling.prepare_file_contexts(files, self.upload_dir)
for fc in file_contexts:
logger.debug(f"Dateikontext: ID={fc['id']}, Name={fc['name']}, Typ={fc.get('type', 'unbekannt')}")
# Dateiinhalte lesen - EINMAL für den gesamten Workflow
file_contents = file_handling.read_file_contents(
file_contexts,
@ -145,11 +147,12 @@ class AgentService:
chat_history = []
# Erstelle das standardisierte Nachrichtenobjekt für die initialen Dateien und den Prompt
# Verwende parse_filedata aus dem Connector und übergebe die vorverarbeiteten Dateiinhalte
initial_message = self.service_aichat.parse_filedata(
# Verwende file_handling.prepare_message_for_ai mit dem Service
initial_message = await file_handling.prepare_message_for_ai(
file_contexts,
prompt,
file_contents # Übergebe die vorverarbeiteten Inhalte
file_contents,
self.service_aichat
)
# Initialen Prompt zum Chatverlauf hinzufügen
@ -199,7 +202,7 @@ class AgentService:
# Moderator trifft die Entscheidung
try:
moderator_chat = self._sanitize_messages(moderator_chat)
moderator_chat = await self._sanitize_messages(moderator_chat)
moderator_decision = await self.service_aichat.call_api(moderator_chat)
moderator_text = moderator_decision["choices"][0]["message"]["content"]
logger.debug(f"Full moderator decision text: {moderator_text}")
@ -243,7 +246,7 @@ class AgentService:
)
# Agent-spezifische Anweisungen erstellen
agent_instructions = agents.get_agent_instructions(selected_agent["type"], selected_agent)
agent_instructions = agents.get_agent_instructions(selected_agent["type"], selected_agent, file_contexts)
# Agent-Prompt erstellen
agent_prompt = agents.create_agent_prompt(selected_agent, agent_instructions)
@ -268,74 +271,10 @@ class AgentService:
# Agent führt seinen Teil aus
try:
# Initialisiere eine Schleife für mögliche Dateianfragen des Agenten
agent_processing_complete = False
max_file_requests = 5 # Begrenze die Anzahl der Dateianfragen pro Agent
file_request_count = 0
while not agent_processing_complete and file_request_count < max_file_requests:
# Rufe die API auf
agent_chat = self._sanitize_messages(agent_chat)
agent_response = await self.service_aichat.call_api(agent_chat)
agent_text = agent_response["choices"][0]["message"]["content"]
# Prüfe, ob der Agent weitere Dateien anfordert
file_commands = file_handling.parse_file_access_commands(agent_text)
if file_commands:
file_request_count += 1
self._add_log(
workflow_id,
f"Agent '{selected_agent['name']}' fordert zusätzliche Dateiinhalte an (Anfrage {file_request_count})",
"info",
selected_agent_id,
selected_agent["name"]
)
# Verarbeite alle Dateianfragen
file_responses = []
for cmd in file_commands:
file_id = cmd.get('file_id')
complete = cmd.get('complete', False)
start = cmd.get('start')
end = cmd.get('end')
pages = cmd.get('pages')
content = file_handling.load_additional_file_content(
workflow_id,
file_id,
file_contents,
file_contexts,
self._add_log,
read_complete=complete,
start_pos=start,
end_pos=end,
page_numbers=pages
)
if content:
file_name = next((f['name'] for f in file_contexts if f['id'] == file_id),
f"Datei {file_id}")
file_responses.append(f"Zusätzlicher Inhalt für {file_name}:\n\n{content}")
else:
file_responses.append(f"Datei mit ID {file_id} konnte nicht geladen werden.")
# Füge die Antworten zum Chatverlauf hinzu
file_response_text = "\n\n".join(file_responses)
system_response = {
"role": "system",
"content": (f"Hier sind die angeforderten Dateiinhalte:\n\n{file_response_text}\n\n" +
f"Bitte fahre nun mit deiner Analyse fort.").strip() }
# Füge Systemantwort zum Chatverlauf und zum Agentenchat hinzu
chat_history.append(system_response)
agent_chat.append(system_response)
# Setze die Schleife fort, um dem Agenten die Möglichkeit zu geben, seine Analyse fortzusetzen
continue
# Keine Dateianfragen mehr - Agent ist fertig
agent_processing_complete = True
# Da wir keine partielle Dateiladung benötigen, können wir den Agenten direkt aufrufen
agent_chat = await self._sanitize_messages(agent_chat)
agent_response = await self.service_aichat.call_api(agent_chat)
agent_text = agent_response["choices"][0]["message"]["content"]
# Füge die endgültige Antwort des Agenten zum Chatverlauf hinzu
chat_history.append({
@ -472,7 +411,7 @@ class AgentService:
# Workflow-Status auf "stopped" setzen
self.workflows[workflow_id]["status"] = "stopped"
self.workflows["progress"] = 1.0
self.workflows[workflow_id]["progress"] = 1.0 # Korrigierter Zugriff auf den Workflow
self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat()
# Log-Eintrag für das Stoppen hinzufügen
@ -484,25 +423,39 @@ class AgentService:
logger.info(f"Workflow {workflow_id} wurde gestoppt")
return True
def _sanitize_message_content(self, content):
"""Ensures content has no trailing whitespace."""
if isinstance(content, str):
return content.rstrip()
return content
def _sanitize_messages(self, messages):
async def _sanitize_messages(self, messages):
"""Sanitizes all messages to prevent API errors."""
if not messages:
return messages
sanitized = []
for message in messages:
sanitized_message = message.copy()
if "content" in sanitized_message:
# Create a deep copy of the message
sanitized_message = message.copy() if isinstance(message, dict) else message
if isinstance(sanitized_message, dict) and "content" in sanitized_message:
sanitized_message["content"] = self._sanitize_message_content(sanitized_message["content"])
sanitized.append(sanitized_message)
return sanitized
def _sanitize_message_content(self, content):
"""Ensures content has no trailing whitespace."""
if isinstance(content, str):
return content.rstrip()
# If content is a list (for multimodal messages), process each item
if isinstance(content, list):
return [
{**item, 'text': item['text'].rstrip() if isinstance(item.get('text'), str) else item.get('text')}
if isinstance(item, dict) and item.get('type') == 'text'
else item
for item in content
]
return content
# Singleton-Factory für AgentService-Instanzen pro Kontext
_agent_service_instances = {}

View file

@ -1,28 +1,76 @@
import os
import json
import logging
from typing import Dict, Any, List, Optional
import re
from typing import Dict, Any, List, Optional, Tuple
# Logger konfigurieren
logger = logging.getLogger(__name__)
def get_agent_instructions(agent_type: str, agent: Dict[str, Any] = None) -> str:
"""Minimalist agent instructions"""
if agent and agent.get("instructions"):
instructions = agent.get("instructions")
else:
instructions = """
Analysiere die Anfrage gründlich.
Liefere präzise und hilfreiche Antworten.
Strukturiere deine Antwort klar.
def get_agent_instructions(agent_type: str, agent: Dict[str, Any] = None, file_contexts: List[Dict[str, Any]] = None) -> str:
"""
Liefert Anweisungen für einen Agenten basierend auf seinen Attributen.
Diese Version fügt explizite Informationen zu Datei-IDs hinzu.
Args:
agent_type: Typ des Agenten
agent: Agent-Konfiguration mit allen Attributen
file_contexts: Liste der verfügbaren Dateien
Returns:
Formatierte Anweisungen für den Agenten
"""
# Basis-Instruktionen aus dem Agenten-Profil extrahieren
base_instructions = ""
if agent:
if agent.get("instructions"):
base_instructions += agent.get("instructions").strip() + "\n\n"
if agent.get("description"):
base_instructions += "Kontext: " + agent.get("description").strip() + "\n\n"
if agent.get("capabilities"):
base_instructions += "Deine Fähigkeiten: " + agent.get("capabilities").strip() + "\n\n"
# Wenn keine Instruktionen gefunden wurden, verwende eine generische Anweisung
if not base_instructions:
base_instructions = """
Analysiere die Anfrage gründlich und liefere ein konkretes, nützliches Ergebnis.
Strukturiere deine Antwort klar und beantworte alle Aspekte der Anfrage.
"""
file_access = """
Dateibefehl: [[FILE:load_file(file_id=ID, complete=true)]]
# Anweisung zur Selbstdeklaration des Ergebnisstatus hinzufügen
status_declaration = """
WICHTIG: Deklariere am Ende deiner Antwort den Status deines Ergebnisses mit einem der folgenden Tags:
[STATUS: ERGEBNIS] - Wenn du ein konkretes, vollständiges Ergebnis geliefert hast
[STATUS: TEILWEISE] - Wenn du ein teilweises Ergebnis geliefert hast, das noch ergänzt werden sollte
[STATUS: PLAN] - Wenn du hauptsächlich einen Plan oder eine Vorgehensweise vorgeschlagen hast
Diese Deklaration hilft dem Moderator zu entscheiden, ob weitere Agentenarbeit erforderlich ist.
"""
return instructions + file_access
# Konkrete Dateiinformationen bereitstellen
file_info = ""
if file_contexts:
file_info = "Verfügbare Dateien:\n"
for file in file_contexts:
file_info += f"- {file['name']} (Typ: {file.get('type', 'unbekannt')}, ID: {file['id']})\n"
file_info += "\n"
# Hinweise zum Dateiaufrufen
file_access = f"""
{file_info}
Um mehr Dateiinhalte anzufordern, verwende einen der folgenden Befehle:
[[FILE:load_file(file_id=DATEI_ID, complete=true)]] - für den vollständigen Dateiinhalt
[[FILE:load_file(file_id=DATEI_ID, pages=[1,2,3])]] - für spezifische Seiten einer PDF
[[FILE:load_file(file_id=DATEI_ID, start=0, end=5000)]] - für Textabschnitte
Ersetze DATEI_ID mit der tatsächlichen ID aus der Dateiliste oben (nur die ID-Nummer, keine Anführungszeichen).
"""
return base_instructions + status_declaration + file_access
def get_default_agent_instructions() -> str:
"""
@ -44,6 +92,9 @@ def get_default_agent_instructions() -> str:
- Gib gut begründete Antworten oder Empfehlungen
- Führe wichtige Erkenntnisse klar auf
- Schließe mit konkreten nächsten Schritten oder Empfehlungen ab
WICHTIG: Deklariere am Ende deiner Antwort den Status deines Ergebnisses:
[STATUS: ERGEBNIS], [STATUS: TEILWEISE] oder [STATUS: PLAN]
"""
@ -68,6 +119,7 @@ def initialize_agents(agents: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]
# Kopiere alle Felder vom Original-Agenten und füge used-Status hinzu
agent_data = agent.copy()
agent_data["used"] = False
agent_data["last_result_status"] = None # Für die Speicherung des Ergebnisstatus
available_agents[agent_id] = agent_data
@ -81,30 +133,63 @@ def initialize_agents(agents: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]
def get_moderator_prompt(available_agents: Dict[str, Dict[str, Any]]) -> str:
"""Streamlined moderator prompt"""
base = "Du bist Moderator eines Multi-Agent-Systems. Koordiniere die Agenten, um die Anfrage zu erfüllen."
"""
Erstellt einen Moderator-Prompt, der die Status-Deklarationen der Agenten berücksichtigt.
agents_list = "\nAgenten:\n"
Args:
available_agents: Dictionary mit verfügbaren Agenten
Returns:
Formatierter Prompt für den Moderator
"""
base = """Du bist Moderator eines Multi-Agent-Systems. Deine Aufgabe ist es, die Agenten zu koordinieren,
um die Anfrage vollständig zu erfüllen und ein konkretes Endergebnis zu liefern.
WICHTIG: Der Workflow darf erst beendet werden, wenn TATSÄCHLICHE ERGEBNISSE geliefert wurden."""
agents_list = "\nVerfügbare Agenten:\n"
for agent_id, agent in available_agents.items():
status = "" if agent["used"] else ""
agents_list += f"- {agent['name']} ({agent['type']}): {status}\n"
status = "✓ Bereits verwendet" if agent["used"] else "✗ Noch nicht verwendet"
result_status = ""
if agent["used"] and agent.get("last_result_status"):
result_status = f" (Letzte Antwort: {agent.get('last_result_status')})"
description = agent.get("description", "")
capabilities = agent.get("capabilities", "")
agents_list += f"- {agent['name']} ({agent['type']}): {capabilities}\n {description}\n Status: {status}{result_status}\n"
instructions = """
Pro Runde: Wähle EINEN Agenten ODER beende den Workflow nur wenn die Anfrage vollständig beantwortet ist.
Bei Agentenwahl: "Ich wähle [Agentname]"
Bei Abschluss: "Workflow beenden"
"""
Berücksichtige die STATUS-Deklarationen der Agenten bei deiner Entscheidung:
- [STATUS: ERGEBNIS] - Der Agent hat ein vollständiges Ergebnis geliefert
- [STATUS: TEILWEISE] - Der Agent hat ein teilweises Ergebnis geliefert, weitere Arbeit ist nötig
- [STATUS: PLAN] - Der Agent hat einen Plan geliefert, keine konkreten Ergebnisse
Mögliche Entscheidungen:
- Bei Agent-Auswahl: "Ich wähle [Agentname], um [konkrete Aufgabe]"
- Bei Abschluss (nur wenn [STATUS: ERGEBNIS] vorliegt): "Workflow beenden - vollständiges Ergebnis erreicht"
WICHTIG: Du darfst den Workflow NUR beenden, wenn mindestens ein Agent ein konkretes [STATUS: ERGEBNIS] geliefert hat!
"""
return base + agents_list + instructions
def create_agent_prompt(agent: Dict[str, Any], agent_instructions: str) -> Dict[str, str]:
"""Minimal agent prompt"""
def create_agent_prompt(agent: Dict[str, Any], agent_instructions: str, file_contexts: List[Dict[str, Any]] = None) -> Dict[str, str]:
"""Verbesserter Agent-Prompt mit Datei-IDs"""
# Füge Datei-ID-Infos hinzu, wenn verfügbar
file_info = ""
if file_contexts:
file_info = "\nVerfügbare Dateien:\n"
for file in file_contexts:
file_info += f"- {file['name']} (ID: {file['id']})\n"
content = f"""
Du bist Agent {agent['name']} ({agent['type']}).
{agent_instructions}
{file_info}
Format: [Agent: {agent['name']}] Deine Antwort...
""".strip()
@ -112,28 +197,138 @@ def create_agent_prompt(agent: Dict[str, Any], agent_instructions: str) -> Dict[
def find_next_agent(moderator_text: str, available_agents: Dict[str, Dict[str, Any]]) -> Optional[str]:
"""Simplified agent selection logic"""
"""
Findet den nächsten Agenten basierend auf der Moderator-Entscheidung.
Berücksichtigt die Anweisung zum Workflow-Abschluss nur, wenn ein Ergebnis vorliegt.
Args:
moderator_text: Text der Moderator-Entscheidung
available_agents: Dictionary mit verfügbaren Agenten
Returns:
ID des nächsten Agenten oder "WORKFLOW_COMPLETE" zum Beenden
"""
text = moderator_text.lower()
# Check for workflow completion
if "workflow beenden" in text:
return "WORKFLOW_COMPLETE"
# Prüfe, ob der Workflow beendet werden soll - nur wenn vollständige Ergebnisse vorliegen
workflow_complete_phrases = [
"workflow beenden - vollständiges ergebnis erreicht",
"workflow beenden - vollständiges ergebnis",
"vollständiges ergebnis erreicht"
]
# Look for "ich wähle" pattern
# Prüfen, ob ein Agent ein Ergebnis geliefert hat
result_exists = False
for agent_id, agent in available_agents.items():
if agent.get("used") and agent.get("last_result_status") == "ERGEBNIS":
result_exists = True
break
# Suche nach exakten Phrasen für Workflow-Beendigung
if "workflow beenden" in text:
# Wenn ein eindeutiges Ergebnis vorliegt oder eine spezifische Phrase gefunden wurde
if result_exists or any(phrase in text for phrase in workflow_complete_phrases):
return "WORKFLOW_COMPLETE"
# Sonst: In den Logs warnen, dass der Moderator versucht, ohne Ergebnis zu beenden
else:
logger.warning("Moderator versuchte, Workflow ohne vollständiges Ergebnis zu beenden")
# Suche nach "ich wähle" Pattern für Agentenwahl
if "ich wähle" in text:
for agent_id, agent in available_agents.items():
if agent["name"].lower() in text:
agent_name_lower = agent["name"].lower()
if agent_name_lower in text:
return agent_id
# Simple name matching
# Fallback: Direktes Name-Matching
for agent_id, agent in available_agents.items():
if agent["name"].lower() in text:
agent_name_lower = agent["name"].lower()
if agent_name_lower in text or f"agent {agent_name_lower}" in text:
return agent_id
# Fallback: first unused agent
# Wenn kein Agent explizit gewählt wurde: Wähle den ersten unbenutzten Agenten
for agent_id, agent in available_agents.items():
if not agent["used"]:
return agent_id
# Last resort: first agent
return list(available_agents.keys())[0] if available_agents else None
# Letzter Ausweg: Ersten Agenten wiederverwenden
if available_agents:
return list(available_agents.keys())[0]
return None
def update_agent_results_with_status(content: str) -> Tuple[str, str]:
"""
Extrahiert den deklarierten Status aus einem Agenten-Ergebnis.
Args:
content: Der vom Agenten gelieferte Ergebnistext
Returns:
Tuple mit (bereinigter Text, Status)
"""
# Standard-Status, falls keine Deklaration gefunden wird
status = "UNBEKANNT"
# Suche nach Status-Deklaration
status_pattern = r'\[STATUS:\s*(ERGEBNIS|TEILWEISE|PLAN)\]'
match = re.search(status_pattern, content, re.IGNORECASE)
if match:
# Extrahiere den Status
status = match.group(1).upper()
# Entferne die Status-Deklaration aus dem Text
content = re.sub(status_pattern, '', content, flags=re.IGNORECASE).strip()
return content, status
def extract_summary(text: str, max_length: int = 200) -> str:
"""
Extrahiert eine kurze Zusammenfassung aus einem Text.
Args:
text: Der zu extrahierende Text
max_length: Maximale Länge der Zusammenfassung
Returns:
Eine kurze Zusammenfassung des Textes
"""
# Erste Zeilen oder Absätze extrahieren
lines = text.split('\n')
paragraphs = []
current_para = []
for line in lines:
line = line.strip()
if not line:
if current_para:
paragraphs.append(' '.join(current_para))
current_para = []
else:
current_para.append(line)
if current_para:
paragraphs.append(' '.join(current_para))
# Versuche, den ersten oder zweiten Absatz als Zusammenfassung zu verwenden
if paragraphs:
summary = paragraphs[0]
# Falls der erste Absatz zu kurz ist, versuche den zweiten hinzuzufügen
if len(summary) < 100 and len(paragraphs) > 1:
summary += " " + paragraphs[1]
# Kürze auf die maximale Länge
if len(summary) > max_length:
summary = summary[:max_length-3] + "..."
return summary
# Fallback auf die ersten Zeichen des Textes
if text:
return text[:max_length-3] + "..." if len(text) > max_length else text
return "Keine Zusammenfassung verfügbar"

View file

@ -45,7 +45,7 @@ def read_file_contents(
# Text-basierte Dateien direkt lesen
if file_type == "document":
# Einfache Textdateien
if file_name.endswith(('.txt', '.csv', '.md', '.json')):
if file_name.endswith(('.txt', '.md', '.json')):
with open(file_path, 'r', encoding='utf-8') as f:
file_contents[file_id] = f.read()
_log(add_log_func, workflow_id, f"Datei {file_name} gelesen", "info")
@ -56,8 +56,7 @@ def read_file_contents(
df = pd.read_excel(file_path)
file_contents[file_id] = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n"
file_contents[file_id] += "Erste 5 Zeilen:\n"
file_contents[file_id] += df.head(5).to_string()
file_contents[file_id] += df.to_string() # Vollständige Tabelle
_log(add_log_func, workflow_id, f"Excel-Datei {file_name} gelesen", "info")
except Exception as e:
_log(add_log_func, workflow_id, f"Fehler beim Lesen der Excel-Datei {file_name}: {str(e)}", "error")
@ -68,8 +67,7 @@ def read_file_contents(
df = pd.read_csv(file_path)
file_contents[file_id] = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n"
file_contents[file_id] += "Erste 5 Zeilen:\n"
file_contents[file_id] += df.head(5).to_string()
file_contents[file_id] += df.to_string() # Vollständige Tabelle
_log(add_log_func, workflow_id, f"CSV-Datei {file_name} gelesen", "info")
except Exception as e:
_log(add_log_func, workflow_id, f"Fehler beim Lesen der CSV-Datei {file_name}: {str(e)}", "error")
@ -84,7 +82,7 @@ def read_file_contents(
text = ""
for page in reader.pages:
text += page.extract_text() + "\n\n"
file_contents[file_id] = f"PDF mit {len(reader.pages)} Seiten.\nInhalt:\n{text[:2000]}..."
file_contents[file_id] = f"PDF mit {len(reader.pages)} Seiten.\nInhalt:\n{text}"
_log(add_log_func, workflow_id, f"PDF-Datei {file_name} gelesen", "info")
except ImportError:
_log(add_log_func, workflow_id,
@ -113,246 +111,6 @@ def read_file_contents(
return file_contents
def load_additional_file_content(
workflow_id: str,
file_id: str,
file_contents: Dict[str, str],
file_contexts: List[Dict[str, Any]],
add_log_func = None,
read_complete: bool = False,
start_pos: int = None,
end_pos: int = None,
page_numbers: List[int] = None
) -> Optional[str]:
"""
Lädt zusätzliche Dateiinhalte für einen Agenten nach.
Args:
workflow_id: ID des aktuellen Workflows für Logging
file_id: ID der Datei, deren Inhalt nachgeladen werden soll
file_contents: Dictionary mit bereits geladenen Dateiinhalten
file_contexts: Liste der Dateikontexte mit Metadaten
add_log_func: Funktion zum Hinzufügen von Logs
read_complete: Wenn True, wird die gesamte Datei geladen
start_pos: Startposition für einen Teilauszug (nur für Textdateien)
end_pos: Endposition für einen Teilauszug (nur für Textdateien)
page_numbers: Liste von Seitennummern für PDFs (1-basiert)
Returns:
Der nachgeladene Dateiinhalt oder None bei Fehler
"""
# Finde Dateikontext zur gegebenen file_id
file_context = next((f for f in file_contexts if f.get("id") == file_id), None)
if not file_context:
_log(add_log_func, workflow_id, f"Datei mit ID {file_id} nicht gefunden", "error")
return None
file_name = file_context.get("name", "Unbekannte Datei")
file_path = file_context.get("path", "")
file_type = file_context.get("type", "")
# Prüfe, ob Dateipfad existiert
if not file_path or not os.path.exists(file_path):
_log(add_log_func, workflow_id, f"Dateipfad für {file_name} nicht gefunden", "error")
return None
try:
# Behandlung je nach Dateityp
if file_name.endswith(('.txt', '.csv', '.md', '.json')):
# Einfache Textdateien
with open(file_path, 'r', encoding='utf-8') as f:
if read_complete:
content = f.read()
_log(add_log_func, workflow_id, f"Vollständige Datei {file_name} nachgeladen", "info")
elif start_pos is not None and end_pos is not None:
f.seek(start_pos)
content = f.read(end_pos - start_pos)
_log(add_log_func, workflow_id, f"Teilinhalt von {file_name} nachgeladen (Pos {start_pos}-{end_pos})", "info")
else:
# Standardverhalten: Lade ersten Teil der Datei
content = f.read(10000) # Größerer Ausschnitt als ursprünglich
_log(add_log_func, workflow_id, f"Erweiterten Teilinhalt von {file_name} nachgeladen", "info")
return content
elif file_name.endswith(('.xlsx', '.xls')):
# Excel-Dateien
df = pd.read_excel(file_path)
if read_complete:
content = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
content += f"Spalten: {', '.join(df.columns.tolist())}\n\n"
content += df.to_string() # Komplette Tabelle
_log(add_log_func, workflow_id, f"Vollständige Excel-Datei {file_name} nachgeladen", "info")
else:
# Erweiterte Vorschau (mehr Zeilen als ursprünglich)
rows_to_show = min(20, len(df)) # Zeige bis zu 20 Zeilen
content = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
content += f"Spalten: {', '.join(df.columns.tolist())}\n\n"
content += f"Erste {rows_to_show} Zeilen:\n"
content += df.head(rows_to_show).to_string()
_log(add_log_func, workflow_id, f"Erweiterte Vorschau der Excel-Datei {file_name} nachgeladen", "info")
return content
elif file_name.endswith('.csv'):
# CSV-Dateien
df = pd.read_csv(file_path)
if read_complete:
content = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
content += f"Spalten: {', '.join(df.columns.tolist())}\n\n"
content += df.to_string() # Komplette Tabelle
_log(add_log_func, workflow_id, f"Vollständige CSV-Datei {file_name} nachgeladen", "info")
else:
# Erweiterte Vorschau
rows_to_show = min(20, len(df)) # Zeige bis zu 20 Zeilen
content = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
content += f"Spalten: {', '.join(df.columns.tolist())}\n\n"
content += f"Erste {rows_to_show} Zeilen:\n"
content += df.head(rows_to_show).to_string()
_log(add_log_func, workflow_id, f"Erweiterte Vorschau der CSV-Datei {file_name} nachgeladen", "info")
return content
elif file_name.endswith('.pdf'):
# PDF-Dateien
try:
from PyPDF2 import PdfReader
reader = PdfReader(file_path)
num_pages = len(reader.pages)
if read_complete:
# Komplette PDF lesen
text = ""
for page in reader.pages:
text += page.extract_text() + "\n\n"
content = f"PDF mit {num_pages} Seiten.\nVollständiger Inhalt:\n{text}"
_log(add_log_func, workflow_id, f"Vollständige PDF-Datei {file_name} nachgeladen", "info")
elif page_numbers:
# Spezifische Seiten lesen
text = ""
valid_pages = [p-1 for p in page_numbers if 0 < p <= num_pages] # 0-basierter Index
for page_idx in valid_pages:
text += f"--- Seite {page_idx + 1} ---\n"
text += reader.pages[page_idx].extract_text() + "\n\n"
content = f"PDF mit {num_pages} Seiten.\nAngeforderte Seiten ({', '.join(map(str, page_numbers))}):\n{text}"
_log(add_log_func, workflow_id, f"Spezifische Seiten von PDF {file_name} nachgeladen", "info")
else:
# Erweiterte Vorschau (mehr Inhalt als ursprünglich)
text = ""
pages_to_show = min(5, num_pages) # Zeige bis zu 5 Seiten
for i in range(pages_to_show):
text += f"--- Seite {i + 1} ---\n"
text += reader.pages[i].extract_text() + "\n\n"
content = f"PDF mit {num_pages} Seiten.\nInhalt der ersten {pages_to_show} Seiten:\n{text}"
_log(add_log_func, workflow_id, f"Erweiterte Vorschau der PDF-Datei {file_name} nachgeladen", "info")
return content
except ImportError:
_log(add_log_func, workflow_id, "PyPDF2 nicht installiert. PDF-Inhalt kann nicht extrahiert werden.", "warning")
return "PDF-Datei (Inhalt nicht verfügbar, PyPDF2 fehlt)"
else:
# Andere Dokumenttypen - versuche generische Textextraktion
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read(10000 if not read_complete else None)
_log(add_log_func, workflow_id, f"Datei {file_name} als Text nachgeladen", "info")
return content
except:
_log(add_log_func, workflow_id, f"Datei {file_name} konnte nicht als Text gelesen werden", "warning")
return f"Dateiinhalt von {file_name} kann nicht gelesen werden (Nicht unterstütztes Format)"
except Exception as e:
_log(add_log_func, workflow_id, f"Fehler beim Nachladen von {file_name}: {str(e)}", "error")
return f"Fehler beim Nachladen der Datei {file_name}: {str(e)}"
def parse_file_access_commands(agent_text: str) -> List[Dict[str, Any]]:
"""
Erkennt und parst Befehle zum Nachladen von Dateien im Text eines Agenten.
Die Befehle haben folgende Syntax:
[[FILE:load_file(file_id, complete=True/False, start=N, end=M, pages=[1,2,3])]]
Args:
agent_text: Der Text des Agenten
Returns:
Liste der erkannten und geparsten Dateizugriffsbefehle
"""
import re
# Muster für Dateizugriffsbefehle
pattern = r'\[\[FILE:load_file\(([^)]+)\)\]\]'
# Finde alle Treffer im Text
matches = re.findall(pattern, agent_text)
commands = []
for match in matches:
try:
# Parse die Parameter
command = {"complete": False, "start": None, "end": None, "pages": None}
# Teile nach Komma, aber beachte Listen wie [1,2,3]
params = []
param_str = ""
bracket_count = 0
for char in match + ',': # Komma am Ende hinzufügen für einfacheres Parsen
if char == ',' and bracket_count == 0:
params.append(param_str.strip())
param_str = ""
else:
param_str += char
if char == '[':
bracket_count += 1
elif char == ']':
bracket_count -= 1
# Verarbeite jeden Parameter
for param in params:
if '=' in param:
key, value = param.split('=', 1)
key = key.strip()
value = value.strip()
if key == 'file_id':
command['file_id'] = value.strip('"\'')
elif key == 'complete':
command['complete'] = value.lower() == 'true'
elif key == 'start':
command['start'] = int(value)
elif key == 'end':
command['end'] = int(value)
elif key == 'pages':
# Konvertiere String "[1,2,3]" zu Liste [1,2,3]
if value.startswith('[') and value.endswith(']'):
page_nums = []
for p in value[1:-1].split(','):
try:
page_nums.append(int(p.strip()))
except ValueError:
pass
command['pages'] = page_nums
else:
# Wenn nur file_id angegeben ist ohne key=value Format
command['file_id'] = param.strip('"\'')
# Nur hinzufügen, wenn file_id vorhanden ist
if 'file_id' in command:
commands.append(command)
except Exception as e:
logger.error(f"Fehler beim Parsen des Dateizugriffsbefehls: {str(e)}")
continue
return commands
def format_file_context_text(file_contexts: List[Dict[str, Any]], file_contents: Dict[str, str]) -> str:
"""
Erstellt eine formatierte Textdarstellung aller Dateien und ihrer Inhalte
@ -370,21 +128,14 @@ def format_file_context_text(file_contexts: List[Dict[str, Any]], file_contents:
for file in file_contexts
])
# Füge Dateiinhalte hinzu (mit Längenbegrenzung)
# Füge Dateiinhalte hinzu (ohne Längenbegrenzung)
for file_id, content in file_contents.items():
file_name = next((f['name'] for f in file_contexts if f['id'] == file_id), "Unbekannte Datei")
file_context_text += f"\n\n==== DATEIINHALT: {file_name} (ID: {file_id}) ====\n"
# Begrenze den Inhalt, um Token-Limits zu respektieren
max_content_length = 5000 # Anpassen je nach Anzahl der Dateien und Umfang
if len(content) > max_content_length:
file_context_text += content[:max_content_length] + "...\n[Dateiinhalt gekürzt aus Platzgründen]"
else:
file_context_text += content
file_context_text += content
return file_context_text
def prepare_file_contexts(files: List[Dict[str, Any]], upload_dir: str) -> List[Dict[str, Any]]:
"""
Bereitet die Dateikontexte vor und ermittelt die vollen Dateipfade
@ -421,6 +172,27 @@ def prepare_file_contexts(files: List[Dict[str, Any]], upload_dir: str) -> List[
return file_contexts
async def prepare_message_for_ai(
file_contexts: List[Dict[str, Any]],
prompt_text: str,
file_contents: Dict[str, str],
service_aichat
) -> Dict[str, Any]:
"""
Bereitet eine vollständige Nachricht mit allen Dateiinhalten für das AI-Modell vor.
Benutzt den AI-Connector, um spezielle Datei-Analysen (wie Bild-Analysen) auszuführen.
Args:
file_contexts: Liste der Dateikontexte mit Metadaten
prompt_text: Der Text-Prompt
file_contents: Dictionary mit bereits geladenen Dateiinhalten
service_aichat: Die AI-Service-Instanz für spezielle Analysen
Returns:
Eine vollständig formatierte Nachricht für das AI-Modell
"""
# Rufe die Methode des AI-Connectors auf, um die Nachricht zu erstellen
return await service_aichat.parse_filedata(file_contexts, prompt_text, file_contents)
def _log(add_log_func, workflow_id, message, log_type, agent_id=None, agent_name=None):
"""Hilfsfunktion zum Loggen mit unterschiedlichen Log-Funktionen"""
@ -434,4 +206,27 @@ def _log(add_log_func, workflow_id, message, log_type, agent_id=None, agent_name
# Log über die bereitgestellte Log-Funktion (falls vorhanden)
if add_log_func and workflow_id:
add_log_func(workflow_id, message, log_type, agent_id, agent_name)
add_log_func(workflow_id, message, log_type, agent_id, agent_name)
# Die folgenden Funktionen werden nicht mehr benötigt, da partielle Dateiladungen entfallen
# Sie sind hier auskommentiert, könnten später aber wieder aktiviert werden
"""
def parse_file_access_commands(agent_text: str) -> List[Dict[str, Any]]:
# Diese Funktion wird vorerst nicht benötigt
return []
def load_additional_file_content(
workflow_id: str,
file_id: str,
file_contents: Dict[str, str],
file_contexts: List[Dict[str, Any]],
add_log_func = None,
read_complete: bool = False,
start_pos: int = None,
end_pos: int = None,
page_numbers: List[int] = None
) -> Optional[str]:
# Diese Funktion wird vorerst nicht benötigt
return None
"""

View file

@ -19,7 +19,8 @@ def create_agent_result(
user_id: int = None
) -> Dict[str, Any]:
"""
Erstellt ein Ergebnisobjekt basierend auf dem Agententyp und der API-Antwort
Erstellt ein Ergebnisobjekt basierend auf dem Agententyp und der API-Antwort.
Diese Version berücksichtigt die Status-Deklaration des Agenten.
Args:
workflow_id: ID des Workflows
@ -34,10 +35,39 @@ def create_agent_result(
Returns:
Ein Ergebnisobjekt für den Workflow
"""
# Importiere die Hilfsfunktionen aus dem agents-Modul
from modules.agentservice_part_agents import update_agent_results_with_status, extract_summary
agent_type = agent["type"]
agent_id = agent["id"]
agent_name = agent["name"]
# Extrahiere den Status aus dem Ergebnis
cleaned_content, result_status = update_agent_results_with_status(content)
# Speichere den Status im Agenten für späteren Zugriff
agent["last_result_status"] = result_status
# Entferne Agent-Präfix aus der Antwort, falls vorhanden
agent_prefix = f"[Agent: {agent_name}]"
if cleaned_content.startswith(agent_prefix):
cleaned_content = cleaned_content[len(agent_prefix):].strip()
# Titel basierend auf Agent-Type und deklariertem Status erstellen
title_prefix = {
"ERGEBNIS": "Ergebnis",
"TEILWEISE": "Teilweises Ergebnis",
"PLAN": "Vorgeschlagener Plan",
"UNBEKANNT": "Antwort"
}.get(result_status, "Antwort")
# Titel basierend auf Agent-Attributen erstellen
agent_desc = agent.get("description", f"Agent {agent_name}")
title = f"{title_prefix}: {agent_desc}"
# Extrahiere Dateinamen für die Metadaten
file_names = [file["name"] for file in file_contexts]
# Grundlegende Ergebnisstruktur
result = {
"id": f"result_{workflow_id}_{index}",
@ -47,57 +77,20 @@ def create_agent_result(
"agent_name": agent_name,
"timestamp": datetime.now().isoformat(),
"type": "text", # Standardtyp
"title": title,
"content": cleaned_content,
"summary": extract_summary(cleaned_content, max_length=200),
"metadata": {
"files_processed": [file["name"] for file in file_contexts],
"prompt": prompt
"files_processed": file_names,
"prompt": prompt,
"agent_type": agent_type,
"result_status": result_status,
"capabilities": agent.get("capabilities", "")
}
}
# Titel und Inhalt basierend auf dem Agententyp anpassen
if agent_type == "analyzer":
result.update({
"title": "Datenanalyse-Ergebnis",
"content": content,
})
elif agent_type == "visualizer":
result.update({
"title": "Visualisierungsvorschlag",
"content": content,
"type": "chart" # Auch wenn kein echtes Diagramm, markieren wir es als solches
})
elif agent_type == "writer":
result.update({
"title": "Zusammenfassung und Empfehlungen",
"content": content,
})
elif agent_type == "scraper":
result.update({
"title": "Web-Recherche Ergebnisse",
"content": content,
})
elif agent_type == "initialisierung":
result.update({
"title": "Direkte Antwort",
"content": content,
})
elif agent_type == "organisator":
result.update({
"title": "Aufgabenstrukturierung",
"content": content,
})
elif agent_type == "entwickler":
result.update({
"title": "Code und Ausführungsergebnisse",
"content": content,
})
else:
result.update({
"title": f"Ergebnis von {agent_name}",
"content": content,
})
}
return result
def add_log(
workflows: Dict[str, Dict[str, Any]],
workflow_id: str,

View file

@ -1,115 +0,0 @@
import asyncio
import sys
import os
import json
import uuid
from pprint import pprint
# Stelle sicher, dass das Verzeichnis mit den Modulen im Pfad ist
sys.path.append(os.path.abspath(os.path.dirname(__file__)))
# Importiere das Hauptmodul
import modules.agentservice_interface as agi
async def test_agentservice():
"""
Einfacher Test für den AgentService mit Beispieldaten.
Führt einen Workflow mit einem einfachen Text-Prompt und optionalen Dateien aus.
"""
print("Starte Test des AgentService...")
# Erstelle einen neuen AgentService
service = agi.get_agentservice_interface(mandate_id=1, user_id=1)
# Erstelle eine Test-Workflow-ID
workflow_id = f"test_{uuid.uuid4()}"
# Beispiel-Prompt
prompt = "Erstelle eine Zusammenfassung der wichtigsten Punkte aus den verfügbaren Dateien."
# Beispiel-Agenten
agents = [
{
"id": "analyzer_1",
"name": "Datenanalyst",
"type": "analyzer",
"capabilities": "Analysiert Daten, erstellt Statistiken und erkennt Muster."
},
{
"id": "writer_1",
"name": "Zusammenfasser",
"type": "writer",
"capabilities": "Erstellt klare, prägnante Zusammenfassungen komplexer Inhalte."
},
{
"id": "init_1",
"name": "Initialisierer",
"type": "initialisierung",
"capabilities": "Gibt einen ersten Überblick und strukturiert die Aufgabe."
}
]
# Optional: Beispiel-Dateien
# Wenn du Dateien testen möchtest, passe diese Pfade an und setze use_files=True
files = []
use_files = False
if use_files:
files = [
{
"id": "text_1",
"name": "beispiel.txt",
"type": "document",
"path": "pfad/zu/beispiel.txt", # Passe diesen Pfad an
"size": "1KB"
},
# Weitere Dateien hier hinzufügen
]
# Führe den Workflow aus
try:
print(f"Starte Workflow {workflow_id}...")
result_workflow_id = await service.execute_workflow(workflow_id, prompt, agents, files)
print(f"Workflow {result_workflow_id} gestartet.")
# Warte und hole regelmäßig den Status ab
MAX_CHECKS = 30
check_interval = 2 # Sekunden
for i in range(MAX_CHECKS):
status = service.get_workflow_status(workflow_id)
print(f"Status nach {i*check_interval}s: {status['status']}, Fortschritt: {status['progress']*100:.1f}%")
if status['status'] in ['completed', 'failed']:
print("Workflow beendet!")
break
await asyncio.sleep(check_interval)
# Hole die Logs
logs = service.get_workflow_logs(workflow_id)
print("\n=== Logs ===")
for log in logs:
print(f"{log['timestamp'][:19]} [{log['type']}] {log['message']}")
# Hole die Ergebnisse
results = service.get_workflow_results(workflow_id)
print("\n=== Ergebnisse ===")
for result in results:
print(f"\nErgebnis von {result['agent_name']} ({result['agent_id']}):")
print(f"Titel: {result['title']}")
print("-" * 50)
print(result['content'][:500] + ("..." if len(result['content']) > 500 else ""))
print("-" * 50)
except Exception as e:
print(f"Fehler bei der Ausführung des Workflows: {str(e)}")
finally:
# Schließe den Service
await service.close()
print("Test abgeschlossen.")
if __name__ == "__main__":
# Führe den Test aus
asyncio.run(test_agentservice())

View file

@ -46,6 +46,7 @@ numpy>=1.24.2
openpyxl>=3.1.2 # For Excel file support
xlrd>=2.0.1 # For legacy Excel file support
PyPDF2>=3.0.1 # For PDF file support
pymupdf==1.22.5 # PDF picture extraction with module fitz
# === Agent Service Interface Dependencies ===
# HTTP Client and Web Scraping
@ -55,6 +56,7 @@ beautifulsoup4>=4.12.2 # For web scraping
lxml>=4.9.2 # For faster HTML parsing
html5lib>=1.1 # Alternative HTML parser
aiofiles>=23.1.0 # Async file operations
Pillow==10.2.0 # image explaining
# AI and NLP
openai>=0.27.4 # OpenAI API client