From 78c583d50014649e95259e4954923cc9c13125ae Mon Sep 17 00:00:00 2001 From: valueon Date: Tue, 25 Mar 2025 21:23:21 +0100 Subject: [PATCH] img extraction direct and from pdf in all modes --- gwserver/_database_lucydom/files.json | 17 +- gwserver/app.py | 2 +- gwserver/config.ini | 2 +- gwserver/connector_aichat_openai.py | 349 ++++++++++++------ gwserver/connector_aiweb_webscraping.py | 1 + gwserver/modules/agentservice_interface.py | 125 ++----- gwserver/modules/agentservice_part_agents.py | 269 ++++++++++++-- .../modules/agentservice_part_filehandling.py | 307 +++------------ gwserver/modules/agentservice_part_results.py | 89 ++--- gwserver/test_agentservice.py | 115 ------ requirements.txt | 2 + 11 files changed, 617 insertions(+), 661 deletions(-) delete mode 100644 gwserver/test_agentservice.py diff --git a/gwserver/_database_lucydom/files.json b/gwserver/_database_lucydom/files.json index f3efcbb1..f8604f2d 100644 --- a/gwserver/_database_lucydom/files.json +++ b/gwserver/_database_lucydom/files.json @@ -13,12 +13,23 @@ { "mandate_id": 1, "user_id": 1, - "name": "test.pdf", + "name": "test_bild.pdf", "type": "document", "content_type": "application/pdf", "size": 299729, - "path": "./_uploads\\c30faecc-c041-4225-805f-741328a04bba.pdf", - "upload_date": "2025-03-24T16:58:50.089708", + "path": "./_uploads\\b73afc5a-131a-4db8-9572-be2a00c71d91.pdf", + "upload_date": "2025-03-25T17:42:03.548097", "id": 2 + }, + { + "mandate_id": 1, + "user_id": 1, + "name": "20240419_093309.jpg", + "type": "image", + "content_type": "image/jpeg", + "size": 286163, + "path": "./_uploads\\46a65f6f-a30c-4cf2-b0e2-c24709c3282e.jpg", + "upload_date": "2025-03-25T18:52:16.203894", + "id": 3 } ] \ No newline at end of file diff --git a/gwserver/app.py b/gwserver/app.py index 38cfd38a..27cf13ee 100644 --- a/gwserver/app.py +++ b/gwserver/app.py @@ -29,7 +29,7 @@ import modules.gateway_model as gateway_model # Konfiguration des Loggers logging.basicConfig( - level=logging.INFO, + level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[logging.StreamHandler()] ) diff --git a/gwserver/config.ini b/gwserver/config.ini index 5224d5fa..6e90d151 100644 --- a/gwserver/config.ini +++ b/gwserver/config.ini @@ -24,7 +24,7 @@ DEBUG = True UPLOAD_DIR = ./_uploads RESULTS_DIR = ./_results MAX_HISTORY = 50 -AI_PROVIDER = anthropic ; openai oder anthropic +AI_PROVIDER = openai ; openai oder anthropic [Connector_AiOpenai] API_KEY = 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 diff --git a/gwserver/connector_aichat_openai.py b/gwserver/connector_aichat_openai.py index 2c2b8762..60e488b2 100644 --- a/gwserver/connector_aichat_openai.py +++ b/gwserver/connector_aichat_openai.py @@ -1,5 +1,7 @@ +import io import os import json +import uuid import logging import httpx import base64 @@ -7,7 +9,10 @@ import mimetypes from typing import Dict, Any, List, Optional from fastapi import HTTPException import configload as configload - +import pandas as pd +from PIL import Image +import PyPDF2 +import fitz # Logger konfigurieren logger = logging.getLogger(__name__) @@ -91,72 +96,173 @@ class ChatService: logger.error(f"Fehler beim Aufruf der OpenAI API: {str(e)}") raise HTTPException(status_code=500, detail=f"Fehler beim Aufruf der OpenAI API: {str(e)}") - def prepare_file_message_content(self, prompt_text: str, file_paths: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + async def close(self): + """Schließt den HTTP-Client beim Beenden der Anwendung""" + await self.http_client.close() + + async def analyze_image(self, image_path: str, prompt: str) -> str: """ - Bereitet eine Nachricht mit Dateien für OpenAI API vor. + Analysiert ein Bild mit der OpenAI Vision API. Args: - prompt_text: Der Text-Prompt - file_paths: Liste von Dateipfaden mit Metadaten (Dict mit id, name, type, path) + image_path: Pfad zum Bild + prompt: Der Prompt für die Analyse Returns: - Eine für OpenAI-API formatierte content-Liste + Die Antwort der OpenAI Vision API als Text """ - message_content = [ - { - "type": "text", - "text": prompt_text - } - ] - - # Füge Dateien als Base64-Anhänge hinzu - for file_info in file_paths: - file_path = file_info.get("path", "") - if file_path and os.path.exists(file_path): - try: - # Datei als Base64 codieren - with open(file_path, "rb") as f: - file_data = f.read() - base64_data = base64.b64encode(file_data).decode('utf-8') - - # MIME-Typ bestimmen - mime_type, _ = mimetypes.guess_type(file_path) - if not mime_type: - mime_type = "application/octet-stream" - - # Bei OpenAI werden Bilder anders behandelt als bei Anthropic - if mime_type.startswith("image/"): - message_content.append({ + try: + # Bild einlesen und als Base64 kodieren + with open(image_path, "rb") as f: + image_data = f.read() + + # In Base64-String konvertieren + base64_data = base64.b64encode(image_data).decode('utf-8') + + # MIME-Typ bestimmen + mime_type, _ = mimetypes.guess_type(image_path) + if not mime_type: + # Standard ist PNG, wenn Typ nicht bestimmt werden kann + mime_type = "image/png" + + # Vision-fähiges Modell für die Anfrage verwenden + vision_model = "gpt-4o" # oder aus der Konfiguration holen + + # Bereite den Payload für die Vision API vor + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { "type": "image_url", "image_url": { "url": f"data:{mime_type};base64,{base64_data}" } - }) - else: - # Für nicht-Bilder werden sie als Textbeschreibung hinzugefügt - message_content.append({ - "type": "text", - "text": f"[Datei: {file_info.get('name', 'Unbekannt')} (Typ: {mime_type})]" - }) - - logger.info(f"Datei {file_info.get('name', 'Unbekannt')} als Anhang hinzugefügt") - - except Exception as e: - logger.error(f"Fehler beim Hinzufügen der Datei {file_info.get('name', 'Unbekannt')}: {str(e)}") + } + ] + } + ] + + # Verwende die bestehende call_api Funktion mit dem Vision-Modell + response = await self.call_api(messages) + + # Inhalt extrahieren und zurückgeben + return response["choices"][0]["message"]["content"] - return message_content - - def parse_filedata(self, file_paths: List[Dict[str, Any]], prompt_text: str = "", file_contents: Dict[str, str] = None) -> Dict[str, Any]: + except Exception as e: + logger.error(f"Fehler bei der Bildanalyse {image_path}: {str(e)}", exc_info=True) + return f"[Fehler bei der Bildanalyse: {str(e)}]" + + + async def extract_and_analyze_pdf_images(self, pdf_path: str, prompt: str) -> List[Dict[str, Any]]: """ - Bereitet Dateien für die OpenAI API vor und erstellt ein einheitliches Message-Objekt. + Extrahiert Bilder aus einer PDF-Datei mit PyMuPDF und analysiert sie mit der Vision API. Args: - file_paths: Liste von Dateipfaden mit Metadaten (Dict mit id, name, type, path) - prompt_text: Der Text-Prompt, der zusammen mit den Dateien gesendet werden soll - file_contents: Optional vorgelesene Dateiinhalte als Dict[file_id, content] + pdf_path: Pfad zur PDF-Datei + prompt: Der Prompt für die Bildanalyse Returns: - Ein standardisiertes Message-Objekt, das für beide API-Typen verwendet werden kann + Eine Liste mit Analyseergebnissen für jedes Bild + """ + import uuid + import os + image_responses = [] + + try: + # PDF öffnen + doc = fitz.open(pdf_path) + logger.info(f"PDF geöffnet: {pdf_path} mit {len(doc)} Seiten") + + for page_num, page in enumerate(doc, 1): + # Alle Bilder auf der Seite finden + image_list = page.get_images(full=True) + + if image_list: + logger.info(f"Seite {page_num}: {len(image_list)} Bilder gefunden") + + for img_index, img in enumerate(image_list): + try: + # Bild-Referenz + xref = img[0] + + # Bild und Metadaten extrahieren + base_image = doc.extract_image(xref) + image_bytes = base_image["image"] # Tatsächliche Bilddaten + image_ext = base_image["ext"] # Dateiendung (jpg, png, etc.) + + # Speichere als temporäre Datei + unique_id = uuid.uuid4() + temp_img_path = f"temp_img_{page_num}_{img_index}_{unique_id}.{image_ext}" + + with open(temp_img_path, "wb") as img_file: + img_file.write(image_bytes) + + logger.debug(f"Bild temporär gespeichert: {temp_img_path}") + + # Analysiere mit Vision API + try: + analysis_result = await self.analyze_image(temp_img_path, prompt) + logger.debug(f"Bildanalyse für Bild {img_index} auf Seite {page_num} abgeschlossen") + except Exception as analyze_error: + logger.error(f"Fehler bei der Bildanalyse: {str(analyze_error)}") + analysis_result = f"[Fehler bei der Bildanalyse: {str(analyze_error)}]" + + # Ergebnis speichern + try: + # Versuche zuerst, die Größe aus base_image zu bekommen + if 'width' in base_image and 'height' in base_image: + image_size = f"{base_image['width']}x{base_image['height']}" + else: + # Alternative: Öffne das temporäre Bild, um die Größe zu bestimmen + with Image.open(temp_img_path) as img: + width, height = img.size + image_size = f"{width}x{height}" + except Exception as e: + logger.warning(f"Konnte Bildgröße nicht ermitteln: {str(e)}") + image_size = "unbekannt" + + image_responses.append({ + "page": page_num, + "image_index": img_index, + "format": image_ext, + "image_size": image_size, # Der key wird immer gesetzt, entweder mit tatsächlicher Größe oder "unbekannt" + "response": analysis_result + }) + + # Temporäre Datei löschen + try: + if os.path.exists(temp_img_path): + os.remove(temp_img_path) + logger.debug(f"Temporäre Bilddatei entfernt: {temp_img_path}") + except Exception as cleanup_error: + logger.warning(f"Temporäre Datei konnte nicht entfernt werden: {cleanup_error}") + + except Exception as e: + logger.warning(f"Fehler bei der Extraktion von Bild {img_index} auf Seite {page_num}: {str(e)}") + continue + + logger.info(f"Extrahiert und analysiert: {len(image_responses)} Bilder aus PDF {os.path.basename(pdf_path)}") + + except ImportError: + logger.error("PyMuPDF (fitz) ist nicht installiert. Installiere es mit 'pip install pymupdf'") + except Exception as e: + logger.error(f"Fehler beim Extrahieren von PDF-Bildern: {str(e)}") + return image_responses + + + async def parse_filedata(self, file_contexts: List[Dict[str, Any]], prompt_text: str, file_contents: Dict[str, str] = None) -> Dict[str, Any]: + """ + Bereitet eine vollständige Nachricht mit allen Dateiinhalten für das AI-Modell vor. + + Args: + file_contexts: Liste der Dateikontexte mit Metadaten + prompt_text: Der Text-Prompt + file_contents: Dictionary mit bereits geladenen Dateiinhalten + + Returns: + Eine vollständig formatierte Nachricht für das AI-Modell """ # Basisstruktur für die Nachricht in OpenAI-Format message = { @@ -171,101 +277,116 @@ class ChatService: "text": prompt_text }) + if not file_contents: + file_contents = {} + # Dateien als Anhänge hinzufügen - for file_info in file_paths: + for file_info in file_contexts: file_path = file_info.get("path", "") file_name = file_info.get("name", "") file_id = file_info.get("id", "") + file_type = file_info.get("type", "") # Prüfen, ob Dateiinhalt bereits vorhanden ist - if file_contents and file_id in file_contents: - # Bereits verarbeiteten Inhalt verwenden + if file_id in file_contents: + content = file_contents[file_id] + + # Problematische Unicode-Zeichen ersetzen + if isinstance(content, str): + content = content.encode('utf-8', errors='replace').decode('utf-8') + else: + content = str(content) + + # Besondere Verarbeitung für PDF-Dateien mit Bildern + if file_name.endswith('.pdf') and file_path and os.path.exists(file_path): + try: + # Bildanalyse der PDF durchführen + image_prompt = prompt_text or "Beschreibe detailliert, was du in diesem Bild siehst." + image_responses = await self.extract_and_analyze_pdf_images(file_path, image_prompt) + + # Nur wenn Bilder gefunden wurden, füge die Analyse hinzu + if image_responses: + image_details = "\n\n".join([ + f"Bild auf Seite {resp['page']} (Größe: {resp['image_size']}): {resp['response']}" + for resp in image_responses + ]) + logger.debug("Image description: "+image_details) + message["content"].append({ + "type": "text", + "text": f"PDF-Bildanalyse für {file_name}:\n{image_details}" + }) + except Exception as e: + logger.error(f"Fehler bei der PDF-Bildanalyse für {file_name}: {str(e)}") + + # Text zur Nachricht hinzufügen message["content"].append({ "type": "text", - "text": f"Datei: {file_name}\n{file_contents[file_id]}" + "text": f"--- DATEI: {file_name} ---\n\n{content}" }) - logger.info(f"Vorverarbeiteter Inhalt für Datei {file_name} verwendet") + logger.info(f"Inhalt für Datei {file_name} zur Nachricht hinzugefügt") continue - # Sonst Datei direkt verarbeiten - if file_path and os.path.exists(file_path): + # Wenn kein Inhalt vorhanden ist, füge einen Hinweis hinzu + if not file_path or not os.path.exists(file_path): + message["content"].append({ + "type": "text", + "text": f"[Datei {file_name} wurde nicht gefunden oder ist nicht zugänglich]" + }) + logger.warning(f"Datei {file_name} nicht gefunden: {file_path}") + continue + + # Direktes Hinzufügen von Bildern bei OpenAI (für Bilder außerhalb von PDFs) + if file_type == "image" or file_name.lower().endswith(('.jpg', '.jpeg', '.png', '.gif', '.webp')): try: - # Datei einlesen + # Bild als Base64 kodieren with open(file_path, "rb") as f: file_data = f.read() - - # MIME-Typ bestimmen mit python-magic, wenn verfügbar - try: - import magic - mime_type = magic.from_buffer(file_data, mime=True) - except ImportError: - # Fallback auf mimetypes, wenn python-magic nicht verfügbar ist - mime_type, _ = mimetypes.guess_type(file_path) - if not mime_type: - # Fallback auf Dateierweiterung - extension = os.path.splitext(file_path)[1].lower()[1:] - mime_type = get_mime_type_from_extension(extension) - - # Content-Type bestimmen für OpenAI - if mime_type.startswith("image/"): - # Bild als Base64 kodieren base64_data = base64.b64encode(file_data).decode('utf-8') + + # MIME-Typ bestimmen + mime_type, _ = mimetypes.guess_type(file_path) + if not mime_type: + mime_type = "application/octet-stream" + + # Bild zur Nachricht hinzufügen + if mime_type.startswith("image/"): + # Füge zunächst die Bildanalyse als Text hinzu + image_prompt = prompt_text or "Beschreibe detailliert, was du in diesem Bild siehst." + analysis_result = await self.analyze_image(file_path, image_prompt) + + message["content"].append({ + "type": "text", + "text": f"Bildanalyse für {file_name}:\n{analysis_result}" + }) + + # Dann füge das Bild selbst hinzu message["content"].append({ "type": "image_url", "image_url": { "url": f"data:{mime_type};base64,{base64_data}" } }) - else: - # Für nicht-Bild-Dateien als Text hinzufügen - try: - # Textdateien direkt als Text extrahieren - if mime_type in ["text/plain", "text/csv", "application/json", "text/html", "application/xml"]: - message["content"].append({ - "type": "text", - "text": f"[Datei: {file_name}]\n{file_data.decode('utf-8', errors='replace')}" - }) - else: - # Bei Binärdateien nur einen Hinweis einfügen - message["content"].append({ - "type": "text", - "text": f"[Datei: {file_name} (Typ: {mime_type}) ist verfügbar, aber kann nicht direkt angezeigt werden]" - }) - except UnicodeDecodeError: - # Bei Decodierungsfehlern nur einen Hinweis einfügen - message["content"].append({ - "type": "text", - "text": f"[Datei: {file_name} (Typ: {mime_type}) ist binär und kann nicht direkt angezeigt werden]" - }) - - logger.info(f"Datei {file_name} zum OpenAI-Nachrichtenobjekt hinzugefügt") - + + logger.info(f"Bild {file_name} analysiert und zur Nachricht hinzugefügt") except Exception as e: - logger.error(f"Fehler beim Hinzufügen der Datei {file_name}: {str(e)}") + logger.error(f"Fehler beim Hinzufügen des Bildes {file_name}: {str(e)}") message["content"].append({ "type": "text", - "text": f"[Fehler beim Laden der Datei {file_name}: {str(e)}]" + "text": f"[Fehler beim Laden des Bildes {file_name}: {str(e)}]" }) - else: - # Datei nicht gefunden - Hinweis einfügen - message["content"].append({ - "type": "text", - "text": f"[Datei {file_name} nicht verfügbar]" - }) - # Prüfe, ob wir ein leeres content-Array haben und wandle es in einen String um + # Optimiere das Message-Format für die API basierend auf dem Inhalt if not message["content"]: + # Leerer Inhalt - setze auf leeren String message["content"] = prompt_text or "" elif len(message["content"]) == 1 and message["content"][0]["type"] == "text": - # Wenn nur ein Text-Element vorhanden ist, vereinfachen wir die Struktur + # Nur ein Text-Element - vereinfache die Struktur message["content"] = message["content"][0]["text"] + # Bei komplizierten Inhalten (Mischung aus Text und Bildern) wird das Array-Format beibehalten + + logger.debug(f"Message-Objekt für OpenAI erstellt mit {len(message['content']) if isinstance(message['content'], list) else 'String'}-Inhalt") return message - - async def close(self): - """Schließt den HTTP-Client beim Beenden der Anwendung""" - await self.http_client.aclose() - def get_mime_type_from_extension(extension: str) -> str: """ diff --git a/gwserver/connector_aiweb_webscraping.py b/gwserver/connector_aiweb_webscraping.py index 66b4c5ee..77127350 100644 --- a/gwserver/connector_aiweb_webscraping.py +++ b/gwserver/connector_aiweb_webscraping.py @@ -9,6 +9,7 @@ import configload as configload import urllib.parse import time import random +import pandas as pd # Logger konfigurieren logger = logging.getLogger(__name__) diff --git a/gwserver/modules/agentservice_interface.py b/gwserver/modules/agentservice_interface.py index 1c2cf59a..4ab7a4a1 100644 --- a/gwserver/modules/agentservice_interface.py +++ b/gwserver/modules/agentservice_interface.py @@ -101,8 +101,8 @@ class AgentService: Anstatt die Agenten der Reihe nach abzuarbeiten, wird ein AI-Moderator verwendet, der die Agenten basierend auf ihren Fähigkeiten und bisherigen Antworten steuert. - Verwendet parse_filedata aus dem jeweiligen Connector für die Verarbeitung von Dateien, - wobei vorverarbeitete Dateiinhalte effizient wiederverwendet werden. + Verwendet file_handling.prepare_message_for_ai für die Vorbereitung der Nachrichten, + wobei alle Dateiinhalte vollständig gelesen werden. """ logger.info(f"Starte Workflow {workflow_id} mit {len(agents_list)} Agenten und {len(files)} Dateien") @@ -126,7 +126,9 @@ class AgentService: # Dateikontexte vorbereiten file_contexts = file_handling.prepare_file_contexts(files, self.upload_dir) - + for fc in file_contexts: + logger.debug(f"Dateikontext: ID={fc['id']}, Name={fc['name']}, Typ={fc.get('type', 'unbekannt')}") + # Dateiinhalte lesen - EINMAL für den gesamten Workflow file_contents = file_handling.read_file_contents( file_contexts, @@ -145,11 +147,12 @@ class AgentService: chat_history = [] # Erstelle das standardisierte Nachrichtenobjekt für die initialen Dateien und den Prompt - # Verwende parse_filedata aus dem Connector und übergebe die vorverarbeiteten Dateiinhalte - initial_message = self.service_aichat.parse_filedata( + # Verwende file_handling.prepare_message_for_ai mit dem Service + initial_message = await file_handling.prepare_message_for_ai( file_contexts, prompt, - file_contents # Übergebe die vorverarbeiteten Inhalte + file_contents, + self.service_aichat ) # Initialen Prompt zum Chatverlauf hinzufügen @@ -199,7 +202,7 @@ class AgentService: # Moderator trifft die Entscheidung try: - moderator_chat = self._sanitize_messages(moderator_chat) + moderator_chat = await self._sanitize_messages(moderator_chat) moderator_decision = await self.service_aichat.call_api(moderator_chat) moderator_text = moderator_decision["choices"][0]["message"]["content"] logger.debug(f"Full moderator decision text: {moderator_text}") @@ -243,7 +246,7 @@ class AgentService: ) # Agent-spezifische Anweisungen erstellen - agent_instructions = agents.get_agent_instructions(selected_agent["type"], selected_agent) + agent_instructions = agents.get_agent_instructions(selected_agent["type"], selected_agent, file_contexts) # Agent-Prompt erstellen agent_prompt = agents.create_agent_prompt(selected_agent, agent_instructions) @@ -268,74 +271,10 @@ class AgentService: # Agent führt seinen Teil aus try: - # Initialisiere eine Schleife für mögliche Dateianfragen des Agenten - agent_processing_complete = False - max_file_requests = 5 # Begrenze die Anzahl der Dateianfragen pro Agent - file_request_count = 0 - - while not agent_processing_complete and file_request_count < max_file_requests: - # Rufe die API auf - agent_chat = self._sanitize_messages(agent_chat) - agent_response = await self.service_aichat.call_api(agent_chat) - agent_text = agent_response["choices"][0]["message"]["content"] - - # Prüfe, ob der Agent weitere Dateien anfordert - file_commands = file_handling.parse_file_access_commands(agent_text) - - if file_commands: - file_request_count += 1 - self._add_log( - workflow_id, - f"Agent '{selected_agent['name']}' fordert zusätzliche Dateiinhalte an (Anfrage {file_request_count})", - "info", - selected_agent_id, - selected_agent["name"] - ) - - # Verarbeite alle Dateianfragen - file_responses = [] - for cmd in file_commands: - file_id = cmd.get('file_id') - complete = cmd.get('complete', False) - start = cmd.get('start') - end = cmd.get('end') - pages = cmd.get('pages') - - content = file_handling.load_additional_file_content( - workflow_id, - file_id, - file_contents, - file_contexts, - self._add_log, - read_complete=complete, - start_pos=start, - end_pos=end, - page_numbers=pages - ) - - if content: - file_name = next((f['name'] for f in file_contexts if f['id'] == file_id), - f"Datei {file_id}") - file_responses.append(f"Zusätzlicher Inhalt für {file_name}:\n\n{content}") - else: - file_responses.append(f"Datei mit ID {file_id} konnte nicht geladen werden.") - - # Füge die Antworten zum Chatverlauf hinzu - file_response_text = "\n\n".join(file_responses) - system_response = { - "role": "system", - "content": (f"Hier sind die angeforderten Dateiinhalte:\n\n{file_response_text}\n\n" + - f"Bitte fahre nun mit deiner Analyse fort.").strip() } - - # Füge Systemantwort zum Chatverlauf und zum Agentenchat hinzu - chat_history.append(system_response) - agent_chat.append(system_response) - - # Setze die Schleife fort, um dem Agenten die Möglichkeit zu geben, seine Analyse fortzusetzen - continue - - # Keine Dateianfragen mehr - Agent ist fertig - agent_processing_complete = True + # Da wir keine partielle Dateiladung benötigen, können wir den Agenten direkt aufrufen + agent_chat = await self._sanitize_messages(agent_chat) + agent_response = await self.service_aichat.call_api(agent_chat) + agent_text = agent_response["choices"][0]["message"]["content"] # Füge die endgültige Antwort des Agenten zum Chatverlauf hinzu chat_history.append({ @@ -472,7 +411,7 @@ class AgentService: # Workflow-Status auf "stopped" setzen self.workflows[workflow_id]["status"] = "stopped" - self.workflows["progress"] = 1.0 + self.workflows[workflow_id]["progress"] = 1.0 # Korrigierter Zugriff auf den Workflow self.workflows[workflow_id]["completed_at"] = datetime.now().isoformat() # Log-Eintrag für das Stoppen hinzufügen @@ -484,25 +423,39 @@ class AgentService: logger.info(f"Workflow {workflow_id} wurde gestoppt") return True - def _sanitize_message_content(self, content): - """Ensures content has no trailing whitespace.""" - if isinstance(content, str): - return content.rstrip() - return content - - def _sanitize_messages(self, messages): + async def _sanitize_messages(self, messages): """Sanitizes all messages to prevent API errors.""" if not messages: return messages sanitized = [] for message in messages: - sanitized_message = message.copy() - if "content" in sanitized_message: + # Create a deep copy of the message + sanitized_message = message.copy() if isinstance(message, dict) else message + + if isinstance(sanitized_message, dict) and "content" in sanitized_message: sanitized_message["content"] = self._sanitize_message_content(sanitized_message["content"]) + sanitized.append(sanitized_message) + return sanitized + def _sanitize_message_content(self, content): + """Ensures content has no trailing whitespace.""" + if isinstance(content, str): + return content.rstrip() + + # If content is a list (for multimodal messages), process each item + if isinstance(content, list): + return [ + {**item, 'text': item['text'].rstrip() if isinstance(item.get('text'), str) else item.get('text')} + if isinstance(item, dict) and item.get('type') == 'text' + else item + for item in content + ] + + return content + # Singleton-Factory für AgentService-Instanzen pro Kontext _agent_service_instances = {} diff --git a/gwserver/modules/agentservice_part_agents.py b/gwserver/modules/agentservice_part_agents.py index 37e9a5d5..7e822974 100644 --- a/gwserver/modules/agentservice_part_agents.py +++ b/gwserver/modules/agentservice_part_agents.py @@ -1,28 +1,76 @@ import os import json import logging -from typing import Dict, Any, List, Optional +import re +from typing import Dict, Any, List, Optional, Tuple # Logger konfigurieren logger = logging.getLogger(__name__) -def get_agent_instructions(agent_type: str, agent: Dict[str, Any] = None) -> str: - """Minimalist agent instructions""" - if agent and agent.get("instructions"): - instructions = agent.get("instructions") - else: - instructions = """ - Analysiere die Anfrage gründlich. - Liefere präzise und hilfreiche Antworten. - Strukturiere deine Antwort klar. +def get_agent_instructions(agent_type: str, agent: Dict[str, Any] = None, file_contexts: List[Dict[str, Any]] = None) -> str: + """ + Liefert Anweisungen für einen Agenten basierend auf seinen Attributen. + + Diese Version fügt explizite Informationen zu Datei-IDs hinzu. + + Args: + agent_type: Typ des Agenten + agent: Agent-Konfiguration mit allen Attributen + file_contexts: Liste der verfügbaren Dateien + + Returns: + Formatierte Anweisungen für den Agenten + """ + # Basis-Instruktionen aus dem Agenten-Profil extrahieren + base_instructions = "" + + if agent: + if agent.get("instructions"): + base_instructions += agent.get("instructions").strip() + "\n\n" + if agent.get("description"): + base_instructions += "Kontext: " + agent.get("description").strip() + "\n\n" + if agent.get("capabilities"): + base_instructions += "Deine Fähigkeiten: " + agent.get("capabilities").strip() + "\n\n" + + # Wenn keine Instruktionen gefunden wurden, verwende eine generische Anweisung + if not base_instructions: + base_instructions = """ + Analysiere die Anfrage gründlich und liefere ein konkretes, nützliches Ergebnis. + Strukturiere deine Antwort klar und beantworte alle Aspekte der Anfrage. """ - file_access = """ - Dateibefehl: [[FILE:load_file(file_id=ID, complete=true)]] + # Anweisung zur Selbstdeklaration des Ergebnisstatus hinzufügen + status_declaration = """ + WICHTIG: Deklariere am Ende deiner Antwort den Status deines Ergebnisses mit einem der folgenden Tags: + + [STATUS: ERGEBNIS] - Wenn du ein konkretes, vollständiges Ergebnis geliefert hast + [STATUS: TEILWEISE] - Wenn du ein teilweises Ergebnis geliefert hast, das noch ergänzt werden sollte + [STATUS: PLAN] - Wenn du hauptsächlich einen Plan oder eine Vorgehensweise vorgeschlagen hast + + Diese Deklaration hilft dem Moderator zu entscheiden, ob weitere Agentenarbeit erforderlich ist. """ - return instructions + file_access - + # Konkrete Dateiinformationen bereitstellen + file_info = "" + if file_contexts: + file_info = "Verfügbare Dateien:\n" + for file in file_contexts: + file_info += f"- {file['name']} (Typ: {file.get('type', 'unbekannt')}, ID: {file['id']})\n" + file_info += "\n" + + # Hinweise zum Dateiaufrufen + file_access = f""" + {file_info} + Um mehr Dateiinhalte anzufordern, verwende einen der folgenden Befehle: + + [[FILE:load_file(file_id=DATEI_ID, complete=true)]] - für den vollständigen Dateiinhalt + [[FILE:load_file(file_id=DATEI_ID, pages=[1,2,3])]] - für spezifische Seiten einer PDF + [[FILE:load_file(file_id=DATEI_ID, start=0, end=5000)]] - für Textabschnitte + + Ersetze DATEI_ID mit der tatsächlichen ID aus der Dateiliste oben (nur die ID-Nummer, keine Anführungszeichen). + """ + + return base_instructions + status_declaration + file_access def get_default_agent_instructions() -> str: """ @@ -44,6 +92,9 @@ def get_default_agent_instructions() -> str: - Gib gut begründete Antworten oder Empfehlungen - Führe wichtige Erkenntnisse klar auf - Schließe mit konkreten nächsten Schritten oder Empfehlungen ab + + WICHTIG: Deklariere am Ende deiner Antwort den Status deines Ergebnisses: + [STATUS: ERGEBNIS], [STATUS: TEILWEISE] oder [STATUS: PLAN] """ @@ -68,6 +119,7 @@ def initialize_agents(agents: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]] # Kopiere alle Felder vom Original-Agenten und füge used-Status hinzu agent_data = agent.copy() agent_data["used"] = False + agent_data["last_result_status"] = None # Für die Speicherung des Ergebnisstatus available_agents[agent_id] = agent_data @@ -81,30 +133,63 @@ def initialize_agents(agents: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]] def get_moderator_prompt(available_agents: Dict[str, Dict[str, Any]]) -> str: - """Streamlined moderator prompt""" - base = "Du bist Moderator eines Multi-Agent-Systems. Koordiniere die Agenten, um die Anfrage zu erfüllen." + """ + Erstellt einen Moderator-Prompt, der die Status-Deklarationen der Agenten berücksichtigt. - agents_list = "\nAgenten:\n" + Args: + available_agents: Dictionary mit verfügbaren Agenten + + Returns: + Formatierter Prompt für den Moderator + """ + base = """Du bist Moderator eines Multi-Agent-Systems. Deine Aufgabe ist es, die Agenten zu koordinieren, +um die Anfrage vollständig zu erfüllen und ein konkretes Endergebnis zu liefern. + +WICHTIG: Der Workflow darf erst beendet werden, wenn TATSÄCHLICHE ERGEBNISSE geliefert wurden.""" + + agents_list = "\nVerfügbare Agenten:\n" for agent_id, agent in available_agents.items(): - status = "✓" if agent["used"] else "✗" - agents_list += f"- {agent['name']} ({agent['type']}): {status}\n" + status = "✓ Bereits verwendet" if agent["used"] else "✗ Noch nicht verwendet" + result_status = "" + if agent["used"] and agent.get("last_result_status"): + result_status = f" (Letzte Antwort: {agent.get('last_result_status')})" + + description = agent.get("description", "") + capabilities = agent.get("capabilities", "") + agents_list += f"- {agent['name']} ({agent['type']}): {capabilities}\n {description}\n Status: {status}{result_status}\n" instructions = """ - Pro Runde: Wähle EINEN Agenten ODER beende den Workflow nur wenn die Anfrage vollständig beantwortet ist. - - Bei Agentenwahl: "Ich wähle [Agentname]" - Bei Abschluss: "Workflow beenden" - """ +Berücksichtige die STATUS-Deklarationen der Agenten bei deiner Entscheidung: + +- [STATUS: ERGEBNIS] - Der Agent hat ein vollständiges Ergebnis geliefert +- [STATUS: TEILWEISE] - Der Agent hat ein teilweises Ergebnis geliefert, weitere Arbeit ist nötig +- [STATUS: PLAN] - Der Agent hat einen Plan geliefert, keine konkreten Ergebnisse + +Mögliche Entscheidungen: +- Bei Agent-Auswahl: "Ich wähle [Agentname], um [konkrete Aufgabe]" +- Bei Abschluss (nur wenn [STATUS: ERGEBNIS] vorliegt): "Workflow beenden - vollständiges Ergebnis erreicht" + +WICHTIG: Du darfst den Workflow NUR beenden, wenn mindestens ein Agent ein konkretes [STATUS: ERGEBNIS] geliefert hat! +""" return base + agents_list + instructions -def create_agent_prompt(agent: Dict[str, Any], agent_instructions: str) -> Dict[str, str]: - """Minimal agent prompt""" +def create_agent_prompt(agent: Dict[str, Any], agent_instructions: str, file_contexts: List[Dict[str, Any]] = None) -> Dict[str, str]: + """Verbesserter Agent-Prompt mit Datei-IDs""" + # Füge Datei-ID-Infos hinzu, wenn verfügbar + file_info = "" + if file_contexts: + file_info = "\nVerfügbare Dateien:\n" + for file in file_contexts: + file_info += f"- {file['name']} (ID: {file['id']})\n" + content = f""" Du bist Agent {agent['name']} ({agent['type']}). {agent_instructions} + {file_info} + Format: [Agent: {agent['name']}] Deine Antwort... """.strip() @@ -112,28 +197,138 @@ def create_agent_prompt(agent: Dict[str, Any], agent_instructions: str) -> Dict[ def find_next_agent(moderator_text: str, available_agents: Dict[str, Dict[str, Any]]) -> Optional[str]: - """Simplified agent selection logic""" + """ + Findet den nächsten Agenten basierend auf der Moderator-Entscheidung. + Berücksichtigt die Anweisung zum Workflow-Abschluss nur, wenn ein Ergebnis vorliegt. + + Args: + moderator_text: Text der Moderator-Entscheidung + available_agents: Dictionary mit verfügbaren Agenten + + Returns: + ID des nächsten Agenten oder "WORKFLOW_COMPLETE" zum Beenden + """ text = moderator_text.lower() - # Check for workflow completion - if "workflow beenden" in text: - return "WORKFLOW_COMPLETE" + # Prüfe, ob der Workflow beendet werden soll - nur wenn vollständige Ergebnisse vorliegen + workflow_complete_phrases = [ + "workflow beenden - vollständiges ergebnis erreicht", + "workflow beenden - vollständiges ergebnis", + "vollständiges ergebnis erreicht" + ] - # Look for "ich wähle" pattern + # Prüfen, ob ein Agent ein Ergebnis geliefert hat + result_exists = False + for agent_id, agent in available_agents.items(): + if agent.get("used") and agent.get("last_result_status") == "ERGEBNIS": + result_exists = True + break + + # Suche nach exakten Phrasen für Workflow-Beendigung + if "workflow beenden" in text: + # Wenn ein eindeutiges Ergebnis vorliegt oder eine spezifische Phrase gefunden wurde + if result_exists or any(phrase in text for phrase in workflow_complete_phrases): + return "WORKFLOW_COMPLETE" + # Sonst: In den Logs warnen, dass der Moderator versucht, ohne Ergebnis zu beenden + else: + logger.warning("Moderator versuchte, Workflow ohne vollständiges Ergebnis zu beenden") + + # Suche nach "ich wähle" Pattern für Agentenwahl if "ich wähle" in text: for agent_id, agent in available_agents.items(): - if agent["name"].lower() in text: + agent_name_lower = agent["name"].lower() + if agent_name_lower in text: return agent_id - # Simple name matching + # Fallback: Direktes Name-Matching for agent_id, agent in available_agents.items(): - if agent["name"].lower() in text: + agent_name_lower = agent["name"].lower() + if agent_name_lower in text or f"agent {agent_name_lower}" in text: return agent_id - # Fallback: first unused agent + # Wenn kein Agent explizit gewählt wurde: Wähle den ersten unbenutzten Agenten for agent_id, agent in available_agents.items(): if not agent["used"]: return agent_id - # Last resort: first agent - return list(available_agents.keys())[0] if available_agents else None \ No newline at end of file + # Letzter Ausweg: Ersten Agenten wiederverwenden + if available_agents: + return list(available_agents.keys())[0] + + return None + + +def update_agent_results_with_status(content: str) -> Tuple[str, str]: + """ + Extrahiert den deklarierten Status aus einem Agenten-Ergebnis. + + Args: + content: Der vom Agenten gelieferte Ergebnistext + + Returns: + Tuple mit (bereinigter Text, Status) + """ + # Standard-Status, falls keine Deklaration gefunden wird + status = "UNBEKANNT" + + # Suche nach Status-Deklaration + status_pattern = r'\[STATUS:\s*(ERGEBNIS|TEILWEISE|PLAN)\]' + match = re.search(status_pattern, content, re.IGNORECASE) + + if match: + # Extrahiere den Status + status = match.group(1).upper() + + # Entferne die Status-Deklaration aus dem Text + content = re.sub(status_pattern, '', content, flags=re.IGNORECASE).strip() + + return content, status + + +def extract_summary(text: str, max_length: int = 200) -> str: + """ + Extrahiert eine kurze Zusammenfassung aus einem Text. + + Args: + text: Der zu extrahierende Text + max_length: Maximale Länge der Zusammenfassung + + Returns: + Eine kurze Zusammenfassung des Textes + """ + # Erste Zeilen oder Absätze extrahieren + lines = text.split('\n') + paragraphs = [] + current_para = [] + + for line in lines: + line = line.strip() + if not line: + if current_para: + paragraphs.append(' '.join(current_para)) + current_para = [] + else: + current_para.append(line) + + if current_para: + paragraphs.append(' '.join(current_para)) + + # Versuche, den ersten oder zweiten Absatz als Zusammenfassung zu verwenden + if paragraphs: + summary = paragraphs[0] + + # Falls der erste Absatz zu kurz ist, versuche den zweiten hinzuzufügen + if len(summary) < 100 and len(paragraphs) > 1: + summary += " " + paragraphs[1] + + # Kürze auf die maximale Länge + if len(summary) > max_length: + summary = summary[:max_length-3] + "..." + + return summary + + # Fallback auf die ersten Zeichen des Textes + if text: + return text[:max_length-3] + "..." if len(text) > max_length else text + + return "Keine Zusammenfassung verfügbar" \ No newline at end of file diff --git a/gwserver/modules/agentservice_part_filehandling.py b/gwserver/modules/agentservice_part_filehandling.py index 59ddcc93..8719bd33 100644 --- a/gwserver/modules/agentservice_part_filehandling.py +++ b/gwserver/modules/agentservice_part_filehandling.py @@ -45,7 +45,7 @@ def read_file_contents( # Text-basierte Dateien direkt lesen if file_type == "document": # Einfache Textdateien - if file_name.endswith(('.txt', '.csv', '.md', '.json')): + if file_name.endswith(('.txt', '.md', '.json')): with open(file_path, 'r', encoding='utf-8') as f: file_contents[file_id] = f.read() _log(add_log_func, workflow_id, f"Datei {file_name} gelesen", "info") @@ -56,8 +56,7 @@ def read_file_contents( df = pd.read_excel(file_path) file_contents[file_id] = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n" - file_contents[file_id] += "Erste 5 Zeilen:\n" - file_contents[file_id] += df.head(5).to_string() + file_contents[file_id] += df.to_string() # Vollständige Tabelle _log(add_log_func, workflow_id, f"Excel-Datei {file_name} gelesen", "info") except Exception as e: _log(add_log_func, workflow_id, f"Fehler beim Lesen der Excel-Datei {file_name}: {str(e)}", "error") @@ -68,8 +67,7 @@ def read_file_contents( df = pd.read_csv(file_path) file_contents[file_id] = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n" - file_contents[file_id] += "Erste 5 Zeilen:\n" - file_contents[file_id] += df.head(5).to_string() + file_contents[file_id] += df.to_string() # Vollständige Tabelle _log(add_log_func, workflow_id, f"CSV-Datei {file_name} gelesen", "info") except Exception as e: _log(add_log_func, workflow_id, f"Fehler beim Lesen der CSV-Datei {file_name}: {str(e)}", "error") @@ -84,7 +82,7 @@ def read_file_contents( text = "" for page in reader.pages: text += page.extract_text() + "\n\n" - file_contents[file_id] = f"PDF mit {len(reader.pages)} Seiten.\nInhalt:\n{text[:2000]}..." + file_contents[file_id] = f"PDF mit {len(reader.pages)} Seiten.\nInhalt:\n{text}" _log(add_log_func, workflow_id, f"PDF-Datei {file_name} gelesen", "info") except ImportError: _log(add_log_func, workflow_id, @@ -113,246 +111,6 @@ def read_file_contents( return file_contents -def load_additional_file_content( - workflow_id: str, - file_id: str, - file_contents: Dict[str, str], - file_contexts: List[Dict[str, Any]], - add_log_func = None, - read_complete: bool = False, - start_pos: int = None, - end_pos: int = None, - page_numbers: List[int] = None -) -> Optional[str]: - """ - Lädt zusätzliche Dateiinhalte für einen Agenten nach. - - Args: - workflow_id: ID des aktuellen Workflows für Logging - file_id: ID der Datei, deren Inhalt nachgeladen werden soll - file_contents: Dictionary mit bereits geladenen Dateiinhalten - file_contexts: Liste der Dateikontexte mit Metadaten - add_log_func: Funktion zum Hinzufügen von Logs - read_complete: Wenn True, wird die gesamte Datei geladen - start_pos: Startposition für einen Teilauszug (nur für Textdateien) - end_pos: Endposition für einen Teilauszug (nur für Textdateien) - page_numbers: Liste von Seitennummern für PDFs (1-basiert) - - Returns: - Der nachgeladene Dateiinhalt oder None bei Fehler - """ - # Finde Dateikontext zur gegebenen file_id - file_context = next((f for f in file_contexts if f.get("id") == file_id), None) - if not file_context: - _log(add_log_func, workflow_id, f"Datei mit ID {file_id} nicht gefunden", "error") - return None - - file_name = file_context.get("name", "Unbekannte Datei") - file_path = file_context.get("path", "") - file_type = file_context.get("type", "") - - # Prüfe, ob Dateipfad existiert - if not file_path or not os.path.exists(file_path): - _log(add_log_func, workflow_id, f"Dateipfad für {file_name} nicht gefunden", "error") - return None - - try: - # Behandlung je nach Dateityp - if file_name.endswith(('.txt', '.csv', '.md', '.json')): - # Einfache Textdateien - with open(file_path, 'r', encoding='utf-8') as f: - if read_complete: - content = f.read() - _log(add_log_func, workflow_id, f"Vollständige Datei {file_name} nachgeladen", "info") - elif start_pos is not None and end_pos is not None: - f.seek(start_pos) - content = f.read(end_pos - start_pos) - _log(add_log_func, workflow_id, f"Teilinhalt von {file_name} nachgeladen (Pos {start_pos}-{end_pos})", "info") - else: - # Standardverhalten: Lade ersten Teil der Datei - content = f.read(10000) # Größerer Ausschnitt als ursprünglich - _log(add_log_func, workflow_id, f"Erweiterten Teilinhalt von {file_name} nachgeladen", "info") - - return content - - elif file_name.endswith(('.xlsx', '.xls')): - # Excel-Dateien - df = pd.read_excel(file_path) - - if read_complete: - content = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" - content += f"Spalten: {', '.join(df.columns.tolist())}\n\n" - content += df.to_string() # Komplette Tabelle - _log(add_log_func, workflow_id, f"Vollständige Excel-Datei {file_name} nachgeladen", "info") - else: - # Erweiterte Vorschau (mehr Zeilen als ursprünglich) - rows_to_show = min(20, len(df)) # Zeige bis zu 20 Zeilen - content = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" - content += f"Spalten: {', '.join(df.columns.tolist())}\n\n" - content += f"Erste {rows_to_show} Zeilen:\n" - content += df.head(rows_to_show).to_string() - _log(add_log_func, workflow_id, f"Erweiterte Vorschau der Excel-Datei {file_name} nachgeladen", "info") - - return content - - elif file_name.endswith('.csv'): - # CSV-Dateien - df = pd.read_csv(file_path) - - if read_complete: - content = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" - content += f"Spalten: {', '.join(df.columns.tolist())}\n\n" - content += df.to_string() # Komplette Tabelle - _log(add_log_func, workflow_id, f"Vollständige CSV-Datei {file_name} nachgeladen", "info") - else: - # Erweiterte Vorschau - rows_to_show = min(20, len(df)) # Zeige bis zu 20 Zeilen - content = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" - content += f"Spalten: {', '.join(df.columns.tolist())}\n\n" - content += f"Erste {rows_to_show} Zeilen:\n" - content += df.head(rows_to_show).to_string() - _log(add_log_func, workflow_id, f"Erweiterte Vorschau der CSV-Datei {file_name} nachgeladen", "info") - - return content - - elif file_name.endswith('.pdf'): - # PDF-Dateien - try: - from PyPDF2 import PdfReader - reader = PdfReader(file_path) - num_pages = len(reader.pages) - - if read_complete: - # Komplette PDF lesen - text = "" - for page in reader.pages: - text += page.extract_text() + "\n\n" - content = f"PDF mit {num_pages} Seiten.\nVollständiger Inhalt:\n{text}" - _log(add_log_func, workflow_id, f"Vollständige PDF-Datei {file_name} nachgeladen", "info") - elif page_numbers: - # Spezifische Seiten lesen - text = "" - valid_pages = [p-1 for p in page_numbers if 0 < p <= num_pages] # 0-basierter Index - for page_idx in valid_pages: - text += f"--- Seite {page_idx + 1} ---\n" - text += reader.pages[page_idx].extract_text() + "\n\n" - content = f"PDF mit {num_pages} Seiten.\nAngeforderte Seiten ({', '.join(map(str, page_numbers))}):\n{text}" - _log(add_log_func, workflow_id, f"Spezifische Seiten von PDF {file_name} nachgeladen", "info") - else: - # Erweiterte Vorschau (mehr Inhalt als ursprünglich) - text = "" - pages_to_show = min(5, num_pages) # Zeige bis zu 5 Seiten - for i in range(pages_to_show): - text += f"--- Seite {i + 1} ---\n" - text += reader.pages[i].extract_text() + "\n\n" - content = f"PDF mit {num_pages} Seiten.\nInhalt der ersten {pages_to_show} Seiten:\n{text}" - _log(add_log_func, workflow_id, f"Erweiterte Vorschau der PDF-Datei {file_name} nachgeladen", "info") - - return content - - except ImportError: - _log(add_log_func, workflow_id, "PyPDF2 nicht installiert. PDF-Inhalt kann nicht extrahiert werden.", "warning") - return "PDF-Datei (Inhalt nicht verfügbar, PyPDF2 fehlt)" - - else: - # Andere Dokumenttypen - versuche generische Textextraktion - try: - with open(file_path, 'r', encoding='utf-8', errors='replace') as f: - content = f.read(10000 if not read_complete else None) - _log(add_log_func, workflow_id, f"Datei {file_name} als Text nachgeladen", "info") - return content - except: - _log(add_log_func, workflow_id, f"Datei {file_name} konnte nicht als Text gelesen werden", "warning") - return f"Dateiinhalt von {file_name} kann nicht gelesen werden (Nicht unterstütztes Format)" - - except Exception as e: - _log(add_log_func, workflow_id, f"Fehler beim Nachladen von {file_name}: {str(e)}", "error") - return f"Fehler beim Nachladen der Datei {file_name}: {str(e)}" - - -def parse_file_access_commands(agent_text: str) -> List[Dict[str, Any]]: - """ - Erkennt und parst Befehle zum Nachladen von Dateien im Text eines Agenten. - - Die Befehle haben folgende Syntax: - [[FILE:load_file(file_id, complete=True/False, start=N, end=M, pages=[1,2,3])]] - - Args: - agent_text: Der Text des Agenten - - Returns: - Liste der erkannten und geparsten Dateizugriffsbefehle - """ - import re - - # Muster für Dateizugriffsbefehle - pattern = r'\[\[FILE:load_file\(([^)]+)\)\]\]' - - # Finde alle Treffer im Text - matches = re.findall(pattern, agent_text) - commands = [] - - for match in matches: - try: - # Parse die Parameter - command = {"complete": False, "start": None, "end": None, "pages": None} - - # Teile nach Komma, aber beachte Listen wie [1,2,3] - params = [] - param_str = "" - bracket_count = 0 - - for char in match + ',': # Komma am Ende hinzufügen für einfacheres Parsen - if char == ',' and bracket_count == 0: - params.append(param_str.strip()) - param_str = "" - else: - param_str += char - if char == '[': - bracket_count += 1 - elif char == ']': - bracket_count -= 1 - - # Verarbeite jeden Parameter - for param in params: - if '=' in param: - key, value = param.split('=', 1) - key = key.strip() - value = value.strip() - - if key == 'file_id': - command['file_id'] = value.strip('"\'') - elif key == 'complete': - command['complete'] = value.lower() == 'true' - elif key == 'start': - command['start'] = int(value) - elif key == 'end': - command['end'] = int(value) - elif key == 'pages': - # Konvertiere String "[1,2,3]" zu Liste [1,2,3] - if value.startswith('[') and value.endswith(']'): - page_nums = [] - for p in value[1:-1].split(','): - try: - page_nums.append(int(p.strip())) - except ValueError: - pass - command['pages'] = page_nums - else: - # Wenn nur file_id angegeben ist ohne key=value Format - command['file_id'] = param.strip('"\'') - - # Nur hinzufügen, wenn file_id vorhanden ist - if 'file_id' in command: - commands.append(command) - - except Exception as e: - logger.error(f"Fehler beim Parsen des Dateizugriffsbefehls: {str(e)}") - continue - - return commands - - def format_file_context_text(file_contexts: List[Dict[str, Any]], file_contents: Dict[str, str]) -> str: """ Erstellt eine formatierte Textdarstellung aller Dateien und ihrer Inhalte @@ -370,21 +128,14 @@ def format_file_context_text(file_contexts: List[Dict[str, Any]], file_contents: for file in file_contexts ]) - # Füge Dateiinhalte hinzu (mit Längenbegrenzung) + # Füge Dateiinhalte hinzu (ohne Längenbegrenzung) for file_id, content in file_contents.items(): file_name = next((f['name'] for f in file_contexts if f['id'] == file_id), "Unbekannte Datei") file_context_text += f"\n\n==== DATEIINHALT: {file_name} (ID: {file_id}) ====\n" - - # Begrenze den Inhalt, um Token-Limits zu respektieren - max_content_length = 5000 # Anpassen je nach Anzahl der Dateien und Umfang - if len(content) > max_content_length: - file_context_text += content[:max_content_length] + "...\n[Dateiinhalt gekürzt aus Platzgründen]" - else: - file_context_text += content + file_context_text += content return file_context_text - def prepare_file_contexts(files: List[Dict[str, Any]], upload_dir: str) -> List[Dict[str, Any]]: """ Bereitet die Dateikontexte vor und ermittelt die vollen Dateipfade @@ -421,6 +172,27 @@ def prepare_file_contexts(files: List[Dict[str, Any]], upload_dir: str) -> List[ return file_contexts +async def prepare_message_for_ai( + file_contexts: List[Dict[str, Any]], + prompt_text: str, + file_contents: Dict[str, str], + service_aichat +) -> Dict[str, Any]: + """ + Bereitet eine vollständige Nachricht mit allen Dateiinhalten für das AI-Modell vor. + Benutzt den AI-Connector, um spezielle Datei-Analysen (wie Bild-Analysen) auszuführen. + + Args: + file_contexts: Liste der Dateikontexte mit Metadaten + prompt_text: Der Text-Prompt + file_contents: Dictionary mit bereits geladenen Dateiinhalten + service_aichat: Die AI-Service-Instanz für spezielle Analysen + + Returns: + Eine vollständig formatierte Nachricht für das AI-Modell + """ + # Rufe die Methode des AI-Connectors auf, um die Nachricht zu erstellen + return await service_aichat.parse_filedata(file_contexts, prompt_text, file_contents) def _log(add_log_func, workflow_id, message, log_type, agent_id=None, agent_name=None): """Hilfsfunktion zum Loggen mit unterschiedlichen Log-Funktionen""" @@ -434,4 +206,27 @@ def _log(add_log_func, workflow_id, message, log_type, agent_id=None, agent_name # Log über die bereitgestellte Log-Funktion (falls vorhanden) if add_log_func and workflow_id: - add_log_func(workflow_id, message, log_type, agent_id, agent_name) \ No newline at end of file + add_log_func(workflow_id, message, log_type, agent_id, agent_name) + +# Die folgenden Funktionen werden nicht mehr benötigt, da partielle Dateiladungen entfallen +# Sie sind hier auskommentiert, könnten später aber wieder aktiviert werden + +""" +def parse_file_access_commands(agent_text: str) -> List[Dict[str, Any]]: + # Diese Funktion wird vorerst nicht benötigt + return [] + +def load_additional_file_content( + workflow_id: str, + file_id: str, + file_contents: Dict[str, str], + file_contexts: List[Dict[str, Any]], + add_log_func = None, + read_complete: bool = False, + start_pos: int = None, + end_pos: int = None, + page_numbers: List[int] = None +) -> Optional[str]: + # Diese Funktion wird vorerst nicht benötigt + return None +""" \ No newline at end of file diff --git a/gwserver/modules/agentservice_part_results.py b/gwserver/modules/agentservice_part_results.py index eb572713..e0a8837d 100644 --- a/gwserver/modules/agentservice_part_results.py +++ b/gwserver/modules/agentservice_part_results.py @@ -19,7 +19,8 @@ def create_agent_result( user_id: int = None ) -> Dict[str, Any]: """ - Erstellt ein Ergebnisobjekt basierend auf dem Agententyp und der API-Antwort + Erstellt ein Ergebnisobjekt basierend auf dem Agententyp und der API-Antwort. + Diese Version berücksichtigt die Status-Deklaration des Agenten. Args: workflow_id: ID des Workflows @@ -34,10 +35,39 @@ def create_agent_result( Returns: Ein Ergebnisobjekt für den Workflow """ + # Importiere die Hilfsfunktionen aus dem agents-Modul + from modules.agentservice_part_agents import update_agent_results_with_status, extract_summary + agent_type = agent["type"] agent_id = agent["id"] agent_name = agent["name"] + # Extrahiere den Status aus dem Ergebnis + cleaned_content, result_status = update_agent_results_with_status(content) + + # Speichere den Status im Agenten für späteren Zugriff + agent["last_result_status"] = result_status + + # Entferne Agent-Präfix aus der Antwort, falls vorhanden + agent_prefix = f"[Agent: {agent_name}]" + if cleaned_content.startswith(agent_prefix): + cleaned_content = cleaned_content[len(agent_prefix):].strip() + + # Titel basierend auf Agent-Type und deklariertem Status erstellen + title_prefix = { + "ERGEBNIS": "Ergebnis", + "TEILWEISE": "Teilweises Ergebnis", + "PLAN": "Vorgeschlagener Plan", + "UNBEKANNT": "Antwort" + }.get(result_status, "Antwort") + + # Titel basierend auf Agent-Attributen erstellen + agent_desc = agent.get("description", f"Agent {agent_name}") + title = f"{title_prefix}: {agent_desc}" + + # Extrahiere Dateinamen für die Metadaten + file_names = [file["name"] for file in file_contexts] + # Grundlegende Ergebnisstruktur result = { "id": f"result_{workflow_id}_{index}", @@ -47,57 +77,20 @@ def create_agent_result( "agent_name": agent_name, "timestamp": datetime.now().isoformat(), "type": "text", # Standardtyp + "title": title, + "content": cleaned_content, + "summary": extract_summary(cleaned_content, max_length=200), "metadata": { - "files_processed": [file["name"] for file in file_contexts], - "prompt": prompt + "files_processed": file_names, + "prompt": prompt, + "agent_type": agent_type, + "result_status": result_status, + "capabilities": agent.get("capabilities", "") } - } - - # Titel und Inhalt basierend auf dem Agententyp anpassen - if agent_type == "analyzer": - result.update({ - "title": "Datenanalyse-Ergebnis", - "content": content, - }) - elif agent_type == "visualizer": - result.update({ - "title": "Visualisierungsvorschlag", - "content": content, - "type": "chart" # Auch wenn kein echtes Diagramm, markieren wir es als solches - }) - elif agent_type == "writer": - result.update({ - "title": "Zusammenfassung und Empfehlungen", - "content": content, - }) - elif agent_type == "scraper": - result.update({ - "title": "Web-Recherche Ergebnisse", - "content": content, - }) - elif agent_type == "initialisierung": - result.update({ - "title": "Direkte Antwort", - "content": content, - }) - elif agent_type == "organisator": - result.update({ - "title": "Aufgabenstrukturierung", - "content": content, - }) - elif agent_type == "entwickler": - result.update({ - "title": "Code und Ausführungsergebnisse", - "content": content, - }) - else: - result.update({ - "title": f"Ergebnis von {agent_name}", - "content": content, - }) - + } return result + def add_log( workflows: Dict[str, Dict[str, Any]], workflow_id: str, diff --git a/gwserver/test_agentservice.py b/gwserver/test_agentservice.py deleted file mode 100644 index e07d6405..00000000 --- a/gwserver/test_agentservice.py +++ /dev/null @@ -1,115 +0,0 @@ -import asyncio -import sys -import os -import json -import uuid -from pprint import pprint - -# Stelle sicher, dass das Verzeichnis mit den Modulen im Pfad ist -sys.path.append(os.path.abspath(os.path.dirname(__file__))) - -# Importiere das Hauptmodul -import modules.agentservice_interface as agi - -async def test_agentservice(): - """ - Einfacher Test für den AgentService mit Beispieldaten. - Führt einen Workflow mit einem einfachen Text-Prompt und optionalen Dateien aus. - """ - print("Starte Test des AgentService...") - - # Erstelle einen neuen AgentService - service = agi.get_agentservice_interface(mandate_id=1, user_id=1) - - # Erstelle eine Test-Workflow-ID - workflow_id = f"test_{uuid.uuid4()}" - - # Beispiel-Prompt - prompt = "Erstelle eine Zusammenfassung der wichtigsten Punkte aus den verfügbaren Dateien." - - # Beispiel-Agenten - agents = [ - { - "id": "analyzer_1", - "name": "Datenanalyst", - "type": "analyzer", - "capabilities": "Analysiert Daten, erstellt Statistiken und erkennt Muster." - }, - { - "id": "writer_1", - "name": "Zusammenfasser", - "type": "writer", - "capabilities": "Erstellt klare, prägnante Zusammenfassungen komplexer Inhalte." - }, - { - "id": "init_1", - "name": "Initialisierer", - "type": "initialisierung", - "capabilities": "Gibt einen ersten Überblick und strukturiert die Aufgabe." - } - ] - - # Optional: Beispiel-Dateien - # Wenn du Dateien testen möchtest, passe diese Pfade an und setze use_files=True - files = [] - use_files = False - - if use_files: - files = [ - { - "id": "text_1", - "name": "beispiel.txt", - "type": "document", - "path": "pfad/zu/beispiel.txt", # Passe diesen Pfad an - "size": "1KB" - }, - # Weitere Dateien hier hinzufügen - ] - - # Führe den Workflow aus - try: - print(f"Starte Workflow {workflow_id}...") - result_workflow_id = await service.execute_workflow(workflow_id, prompt, agents, files) - print(f"Workflow {result_workflow_id} gestartet.") - - # Warte und hole regelmäßig den Status ab - MAX_CHECKS = 30 - check_interval = 2 # Sekunden - - for i in range(MAX_CHECKS): - status = service.get_workflow_status(workflow_id) - print(f"Status nach {i*check_interval}s: {status['status']}, Fortschritt: {status['progress']*100:.1f}%") - - if status['status'] in ['completed', 'failed']: - print("Workflow beendet!") - break - - await asyncio.sleep(check_interval) - - # Hole die Logs - logs = service.get_workflow_logs(workflow_id) - print("\n=== Logs ===") - for log in logs: - print(f"{log['timestamp'][:19]} [{log['type']}] {log['message']}") - - # Hole die Ergebnisse - results = service.get_workflow_results(workflow_id) - print("\n=== Ergebnisse ===") - for result in results: - print(f"\nErgebnis von {result['agent_name']} ({result['agent_id']}):") - print(f"Titel: {result['title']}") - print("-" * 50) - print(result['content'][:500] + ("..." if len(result['content']) > 500 else "")) - print("-" * 50) - - except Exception as e: - print(f"Fehler bei der Ausführung des Workflows: {str(e)}") - finally: - # Schließe den Service - await service.close() - - print("Test abgeschlossen.") - -if __name__ == "__main__": - # Führe den Test aus - asyncio.run(test_agentservice()) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index fe13668a..6c3ec45d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -46,6 +46,7 @@ numpy>=1.24.2 openpyxl>=3.1.2 # For Excel file support xlrd>=2.0.1 # For legacy Excel file support PyPDF2>=3.0.1 # For PDF file support +pymupdf==1.22.5 # PDF picture extraction with module fitz # === Agent Service Interface Dependencies === # HTTP Client and Web Scraping @@ -55,6 +56,7 @@ beautifulsoup4>=4.12.2 # For web scraping lxml>=4.9.2 # For faster HTML parsing html5lib>=1.1 # Alternative HTML parser aiofiles>=23.1.0 # Async file operations +Pillow==10.2.0 # image explaining # AI and NLP openai>=0.27.4 # OpenAI API client