import os import logging import pandas as pd from typing import Dict, Any, List, Optional, Tuple # Logger konfigurieren logger = logging.getLogger(__name__) def read_file_contents( file_contexts: List[Dict[str, Any]], upload_dir: str, workflow_id: str = None, add_log_func = None ) -> Dict[str, str]: """ Liest die Inhalte aller Dateien und bereitet sie für die Verwendung vor. Args: file_contexts: Liste der Dateikontexte mit Metadaten upload_dir: Verzeichnis für Uploads workflow_id: Optional ID des Workflows für Logging add_log_func: Optionale Funktion zum Hinzufügen von Logs Returns: Dictionary mit Dateiinhalten (file_id -> Inhalt) """ file_contents = {} for file in file_contexts: file_id = file["id"] file_name = file["name"] file_type = file["type"] file_path = file.get("path", "") # Wenn Pfad nicht gesetzt, versuche ihn aus dem Upload-Verzeichnis abzuleiten if not file_path and file_name: possible_path = os.path.join(upload_dir, file_name) if os.path.exists(possible_path): file_path = possible_path logger.debug(f"Pfad für Datei {file_name} gefunden: {file_path}") # Dateiinhalt lesen, wenn der Pfad verfügbar ist if file_path and os.path.exists(file_path): try: # Text-basierte Dateien direkt lesen if file_type == "document": # Einfache Textdateien if file_name.endswith(('.txt', '.csv', '.md', '.json')): with open(file_path, 'r', encoding='utf-8') as f: file_contents[file_id] = f.read() _log(add_log_func, workflow_id, f"Datei {file_name} gelesen", "info") # Excel-Dateien elif file_name.endswith(('.xlsx', '.xls')): try: df = pd.read_excel(file_path) file_contents[file_id] = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n" file_contents[file_id] += "Erste 5 Zeilen:\n" file_contents[file_id] += df.head(5).to_string() _log(add_log_func, workflow_id, f"Excel-Datei {file_name} gelesen", "info") except Exception as e: _log(add_log_func, workflow_id, f"Fehler beim Lesen der Excel-Datei {file_name}: {str(e)}", "error") # CSV-Dateien elif file_name.endswith('.csv'): try: df = pd.read_csv(file_path) file_contents[file_id] = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n" file_contents[file_id] += "Erste 5 Zeilen:\n" file_contents[file_id] += df.head(5).to_string() _log(add_log_func, workflow_id, f"CSV-Datei {file_name} gelesen", "info") except Exception as e: _log(add_log_func, workflow_id, f"Fehler beim Lesen der CSV-Datei {file_name}: {str(e)}", "error") # PDF-Dateien elif file_name.endswith('.pdf'): try: # Falls PyPDF2 installiert ist try: from PyPDF2 import PdfReader reader = PdfReader(file_path) text = "" for page in reader.pages: text += page.extract_text() + "\n\n" file_contents[file_id] = f"PDF mit {len(reader.pages)} Seiten.\nInhalt:\n{text[:2000]}..." _log(add_log_func, workflow_id, f"PDF-Datei {file_name} gelesen", "info") except ImportError: _log(add_log_func, workflow_id, "PyPDF2 nicht installiert. PDF-Inhalt kann nicht extrahiert werden.", "warning") file_contents[file_id] = f"PDF-Datei (Inhalt nicht verfügbar, PyPDF2 fehlt)" except Exception as e: _log(add_log_func, workflow_id, f"Fehler beim Lesen der PDF-Datei {file_name}: {str(e)}", "error") # Andere Dokumenttypen else: _log(add_log_func, workflow_id, f"Nicht unterstütztes Dokumentformat: {file_name}", "warning") file_contents[file_id] = f"Dateiinhalt nicht verfügbar (Nicht unterstütztes Format)" # Bilddateien werden nicht direkt gelesen, nur Metadaten gespeichert elif file_type == "image": file_contents[file_id] = f"Bilddatei: {file_name} (Inhalt nicht als Text verfügbar)" except Exception as e: logger.error(f"Fehler beim Lesen der Datei {file_name}: {str(e)}") _log(add_log_func, workflow_id, f"Fehler beim Lesen der Datei {file_name}: {str(e)}", "error") else: if file_path: _log(add_log_func, workflow_id, f"Datei {file_name} nicht gefunden: {file_path}", "warning") else: _log(add_log_func, workflow_id, f"Kein Pfad für Datei {file_name} verfügbar", "warning") file_contents[file_id] = f"Dateiinhalt nicht verfügbar" return file_contents def load_additional_file_content( workflow_id: str, file_id: str, file_contents: Dict[str, str], file_contexts: List[Dict[str, Any]], add_log_func = None, read_complete: bool = False, start_pos: int = None, end_pos: int = None, page_numbers: List[int] = None ) -> Optional[str]: """ Lädt zusätzliche Dateiinhalte für einen Agenten nach. Args: workflow_id: ID des aktuellen Workflows für Logging file_id: ID der Datei, deren Inhalt nachgeladen werden soll file_contents: Dictionary mit bereits geladenen Dateiinhalten file_contexts: Liste der Dateikontexte mit Metadaten add_log_func: Funktion zum Hinzufügen von Logs read_complete: Wenn True, wird die gesamte Datei geladen start_pos: Startposition für einen Teilauszug (nur für Textdateien) end_pos: Endposition für einen Teilauszug (nur für Textdateien) page_numbers: Liste von Seitennummern für PDFs (1-basiert) Returns: Der nachgeladene Dateiinhalt oder None bei Fehler """ # Finde Dateikontext zur gegebenen file_id file_context = next((f for f in file_contexts if f.get("id") == file_id), None) if not file_context: _log(add_log_func, workflow_id, f"Datei mit ID {file_id} nicht gefunden", "error") return None file_name = file_context.get("name", "Unbekannte Datei") file_path = file_context.get("path", "") file_type = file_context.get("type", "") # Prüfe, ob Dateipfad existiert if not file_path or not os.path.exists(file_path): _log(add_log_func, workflow_id, f"Dateipfad für {file_name} nicht gefunden", "error") return None try: # Behandlung je nach Dateityp if file_name.endswith(('.txt', '.csv', '.md', '.json')): # Einfache Textdateien with open(file_path, 'r', encoding='utf-8') as f: if read_complete: content = f.read() _log(add_log_func, workflow_id, f"Vollständige Datei {file_name} nachgeladen", "info") elif start_pos is not None and end_pos is not None: f.seek(start_pos) content = f.read(end_pos - start_pos) _log(add_log_func, workflow_id, f"Teilinhalt von {file_name} nachgeladen (Pos {start_pos}-{end_pos})", "info") else: # Standardverhalten: Lade ersten Teil der Datei content = f.read(10000) # Größerer Ausschnitt als ursprünglich _log(add_log_func, workflow_id, f"Erweiterten Teilinhalt von {file_name} nachgeladen", "info") return content elif file_name.endswith(('.xlsx', '.xls')): # Excel-Dateien df = pd.read_excel(file_path) if read_complete: content = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" content += f"Spalten: {', '.join(df.columns.tolist())}\n\n" content += df.to_string() # Komplette Tabelle _log(add_log_func, workflow_id, f"Vollständige Excel-Datei {file_name} nachgeladen", "info") else: # Erweiterte Vorschau (mehr Zeilen als ursprünglich) rows_to_show = min(20, len(df)) # Zeige bis zu 20 Zeilen content = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" content += f"Spalten: {', '.join(df.columns.tolist())}\n\n" content += f"Erste {rows_to_show} Zeilen:\n" content += df.head(rows_to_show).to_string() _log(add_log_func, workflow_id, f"Erweiterte Vorschau der Excel-Datei {file_name} nachgeladen", "info") return content elif file_name.endswith('.csv'): # CSV-Dateien df = pd.read_csv(file_path) if read_complete: content = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" content += f"Spalten: {', '.join(df.columns.tolist())}\n\n" content += df.to_string() # Komplette Tabelle _log(add_log_func, workflow_id, f"Vollständige CSV-Datei {file_name} nachgeladen", "info") else: # Erweiterte Vorschau rows_to_show = min(20, len(df)) # Zeige bis zu 20 Zeilen content = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n" content += f"Spalten: {', '.join(df.columns.tolist())}\n\n" content += f"Erste {rows_to_show} Zeilen:\n" content += df.head(rows_to_show).to_string() _log(add_log_func, workflow_id, f"Erweiterte Vorschau der CSV-Datei {file_name} nachgeladen", "info") return content elif file_name.endswith('.pdf'): # PDF-Dateien try: from PyPDF2 import PdfReader reader = PdfReader(file_path) num_pages = len(reader.pages) if read_complete: # Komplette PDF lesen text = "" for page in reader.pages: text += page.extract_text() + "\n\n" content = f"PDF mit {num_pages} Seiten.\nVollständiger Inhalt:\n{text}" _log(add_log_func, workflow_id, f"Vollständige PDF-Datei {file_name} nachgeladen", "info") elif page_numbers: # Spezifische Seiten lesen text = "" valid_pages = [p-1 for p in page_numbers if 0 < p <= num_pages] # 0-basierter Index for page_idx in valid_pages: text += f"--- Seite {page_idx + 1} ---\n" text += reader.pages[page_idx].extract_text() + "\n\n" content = f"PDF mit {num_pages} Seiten.\nAngeforderte Seiten ({', '.join(map(str, page_numbers))}):\n{text}" _log(add_log_func, workflow_id, f"Spezifische Seiten von PDF {file_name} nachgeladen", "info") else: # Erweiterte Vorschau (mehr Inhalt als ursprünglich) text = "" pages_to_show = min(5, num_pages) # Zeige bis zu 5 Seiten for i in range(pages_to_show): text += f"--- Seite {i + 1} ---\n" text += reader.pages[i].extract_text() + "\n\n" content = f"PDF mit {num_pages} Seiten.\nInhalt der ersten {pages_to_show} Seiten:\n{text}" _log(add_log_func, workflow_id, f"Erweiterte Vorschau der PDF-Datei {file_name} nachgeladen", "info") return content except ImportError: _log(add_log_func, workflow_id, "PyPDF2 nicht installiert. PDF-Inhalt kann nicht extrahiert werden.", "warning") return "PDF-Datei (Inhalt nicht verfügbar, PyPDF2 fehlt)" else: # Andere Dokumenttypen - versuche generische Textextraktion try: with open(file_path, 'r', encoding='utf-8', errors='replace') as f: content = f.read(10000 if not read_complete else None) _log(add_log_func, workflow_id, f"Datei {file_name} als Text nachgeladen", "info") return content except: _log(add_log_func, workflow_id, f"Datei {file_name} konnte nicht als Text gelesen werden", "warning") return f"Dateiinhalt von {file_name} kann nicht gelesen werden (Nicht unterstütztes Format)" except Exception as e: _log(add_log_func, workflow_id, f"Fehler beim Nachladen von {file_name}: {str(e)}", "error") return f"Fehler beim Nachladen der Datei {file_name}: {str(e)}" def parse_file_access_commands(agent_text: str) -> List[Dict[str, Any]]: """ Erkennt und parst Befehle zum Nachladen von Dateien im Text eines Agenten. Die Befehle haben folgende Syntax: [[FILE:load_file(file_id, complete=True/False, start=N, end=M, pages=[1,2,3])]] Args: agent_text: Der Text des Agenten Returns: Liste der erkannten und geparsten Dateizugriffsbefehle """ import re # Muster für Dateizugriffsbefehle pattern = r'\[\[FILE:load_file\(([^)]+)\)\]\]' # Finde alle Treffer im Text matches = re.findall(pattern, agent_text) commands = [] for match in matches: try: # Parse die Parameter command = {"complete": False, "start": None, "end": None, "pages": None} # Teile nach Komma, aber beachte Listen wie [1,2,3] params = [] param_str = "" bracket_count = 0 for char in match + ',': # Komma am Ende hinzufügen für einfacheres Parsen if char == ',' and bracket_count == 0: params.append(param_str.strip()) param_str = "" else: param_str += char if char == '[': bracket_count += 1 elif char == ']': bracket_count -= 1 # Verarbeite jeden Parameter for param in params: if '=' in param: key, value = param.split('=', 1) key = key.strip() value = value.strip() if key == 'file_id': command['file_id'] = value.strip('"\'') elif key == 'complete': command['complete'] = value.lower() == 'true' elif key == 'start': command['start'] = int(value) elif key == 'end': command['end'] = int(value) elif key == 'pages': # Konvertiere String "[1,2,3]" zu Liste [1,2,3] if value.startswith('[') and value.endswith(']'): page_nums = [] for p in value[1:-1].split(','): try: page_nums.append(int(p.strip())) except ValueError: pass command['pages'] = page_nums else: # Wenn nur file_id angegeben ist ohne key=value Format command['file_id'] = param.strip('"\'') # Nur hinzufügen, wenn file_id vorhanden ist if 'file_id' in command: commands.append(command) except Exception as e: logger.error(f"Fehler beim Parsen des Dateizugriffsbefehls: {str(e)}") continue return commands def format_file_context_text(file_contexts: List[Dict[str, Any]], file_contents: Dict[str, str]) -> str: """ Erstellt eine formatierte Textdarstellung aller Dateien und ihrer Inhalte Args: file_contexts: Liste der Dateikontexte mit Metadaten file_contents: Dictionary mit Dateiinhalten Returns: Formatierter Text mit Dateiliste und Inhaltsauszügen """ # Erstelle einen Kontext mit Dateiliste und Inhalten für leichteren Zugriff file_context_text = "Verfügbare Dateien:\n" + "\n".join([ f"- {file['name']} ({file['type']}, {file['size']}, ID: {file['id']})" for file in file_contexts ]) # Füge Dateiinhalte hinzu (mit Längenbegrenzung) for file_id, content in file_contents.items(): file_name = next((f['name'] for f in file_contexts if f['id'] == file_id), "Unbekannte Datei") file_context_text += f"\n\n==== DATEIINHALT: {file_name} (ID: {file_id}) ====\n" # Begrenze den Inhalt, um Token-Limits zu respektieren max_content_length = 5000 # Anpassen je nach Anzahl der Dateien und Umfang if len(content) > max_content_length: file_context_text += content[:max_content_length] + "...\n[Dateiinhalt gekürzt aus Platzgründen]" else: file_context_text += content return file_context_text def prepare_file_contexts(files: List[Dict[str, Any]], upload_dir: str) -> List[Dict[str, Any]]: """ Bereitet die Dateikontexte vor und ermittelt die vollen Dateipfade Args: files: Liste von Dateien mit Metadaten (Dict mit id, name, type) upload_dir: Verzeichnis für Uploads Returns: Liste von Dateikontexten mit vollständigen Pfaden """ file_contexts = [] for file in files: file_id = file["id"] file_name = file["name"] file_type = file["type"] file_path = file.get("path", "") # Wenn kein Pfad angegeben ist, versuche, ihn aus dem Upload-Verzeichnis abzuleiten if not file_path and file_name: possible_path = os.path.join(upload_dir, file_name) if os.path.exists(possible_path): file_path = possible_path logger.debug(f"Pfad für Datei {file_name} gefunden: {file_path}") file_contexts.append({ "id": file_id, "name": file_name, "type": file_type, "size": file.get("size", "Unbekannt"), "path": file_path }) return file_contexts def _log(add_log_func, workflow_id, message, log_type, agent_id=None, agent_name=None): """Hilfsfunktion zum Loggen mit unterschiedlichen Log-Funktionen""" # Log über die Logger-Instanz if log_type == "error": logger.error(message) elif log_type == "warning": logger.warning(message) else: logger.info(message) # Log über die bereitgestellte Log-Funktion (falls vorhanden) if add_log_func and workflow_id: add_log_func(workflow_id, message, log_type, agent_id, agent_name)