232 lines
No EOL
10 KiB
Python
232 lines
No EOL
10 KiB
Python
import os
|
|
import logging
|
|
import pandas as pd
|
|
from typing import Dict, Any, List, Optional, Tuple
|
|
|
|
# Logger konfigurieren
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def read_file_contents(
|
|
file_contexts: List[Dict[str, Any]],
|
|
upload_dir: str,
|
|
workflow_id: str = None,
|
|
add_log_func = None
|
|
) -> Dict[str, str]:
|
|
"""
|
|
Liest die Inhalte aller Dateien und bereitet sie für die Verwendung vor.
|
|
|
|
Args:
|
|
file_contexts: Liste der Dateikontexte mit Metadaten
|
|
upload_dir: Verzeichnis für Uploads
|
|
workflow_id: Optional ID des Workflows für Logging
|
|
add_log_func: Optionale Funktion zum Hinzufügen von Logs
|
|
|
|
Returns:
|
|
Dictionary mit Dateiinhalten (file_id -> Inhalt)
|
|
"""
|
|
file_contents = {}
|
|
|
|
for file in file_contexts:
|
|
file_id = file["id"]
|
|
file_name = file["name"]
|
|
file_type = file["type"]
|
|
file_path = file.get("path", "")
|
|
|
|
# Wenn Pfad nicht gesetzt, versuche ihn aus dem Upload-Verzeichnis abzuleiten
|
|
if not file_path and file_name:
|
|
possible_path = os.path.join(upload_dir, file_name)
|
|
if os.path.exists(possible_path):
|
|
file_path = possible_path
|
|
logger.debug(f"Pfad für Datei {file_name} gefunden: {file_path}")
|
|
|
|
# Dateiinhalt lesen, wenn der Pfad verfügbar ist
|
|
if file_path and os.path.exists(file_path):
|
|
try:
|
|
# Text-basierte Dateien direkt lesen
|
|
if file_type == "document":
|
|
# Einfache Textdateien
|
|
if file_name.endswith(('.txt', '.md', '.json')):
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
file_contents[file_id] = f.read()
|
|
_log(add_log_func, workflow_id, f"Datei {file_name} gelesen", "info")
|
|
|
|
# Excel-Dateien
|
|
elif file_name.endswith(('.xlsx', '.xls')):
|
|
try:
|
|
df = pd.read_excel(file_path)
|
|
file_contents[file_id] = f"Excel-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
|
|
file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n"
|
|
file_contents[file_id] += df.to_string() # Vollständige Tabelle
|
|
_log(add_log_func, workflow_id, f"Excel-Datei {file_name} gelesen", "info")
|
|
except Exception as e:
|
|
_log(add_log_func, workflow_id, f"Fehler beim Lesen der Excel-Datei {file_name}: {str(e)}", "error")
|
|
|
|
# CSV-Dateien
|
|
elif file_name.endswith('.csv'):
|
|
try:
|
|
df = pd.read_csv(file_path)
|
|
file_contents[file_id] = f"CSV-Datei mit {len(df)} Zeilen und {len(df.columns)} Spalten.\n"
|
|
file_contents[file_id] += f"Spalten: {', '.join(df.columns.tolist())}\n"
|
|
file_contents[file_id] += df.to_string() # Vollständige Tabelle
|
|
_log(add_log_func, workflow_id, f"CSV-Datei {file_name} gelesen", "info")
|
|
except Exception as e:
|
|
_log(add_log_func, workflow_id, f"Fehler beim Lesen der CSV-Datei {file_name}: {str(e)}", "error")
|
|
|
|
# PDF-Dateien
|
|
elif file_name.endswith('.pdf'):
|
|
try:
|
|
# Falls PyPDF2 installiert ist
|
|
try:
|
|
from PyPDF2 import PdfReader
|
|
reader = PdfReader(file_path)
|
|
text = ""
|
|
for page in reader.pages:
|
|
text += page.extract_text() + "\n\n"
|
|
file_contents[file_id] = f"PDF mit {len(reader.pages)} Seiten.\nInhalt:\n{text}"
|
|
_log(add_log_func, workflow_id, f"PDF-Datei {file_name} gelesen", "info")
|
|
except ImportError:
|
|
_log(add_log_func, workflow_id,
|
|
"PyPDF2 nicht installiert. PDF-Inhalt kann nicht extrahiert werden.", "warning")
|
|
file_contents[file_id] = f"PDF-Datei (Inhalt nicht verfügbar, PyPDF2 fehlt)"
|
|
except Exception as e:
|
|
_log(add_log_func, workflow_id, f"Fehler beim Lesen der PDF-Datei {file_name}: {str(e)}", "error")
|
|
|
|
# Andere Dokumenttypen
|
|
else:
|
|
_log(add_log_func, workflow_id, f"Nicht unterstütztes Dokumentformat: {file_name}", "warning")
|
|
file_contents[file_id] = f"Dateiinhalt nicht verfügbar (Nicht unterstütztes Format)"
|
|
|
|
# Bilddateien werden nicht direkt gelesen, nur Metadaten gespeichert
|
|
elif file_type == "image":
|
|
file_contents[file_id] = f"Bilddatei: {file_name} (Inhalt nicht als Text verfügbar)"
|
|
except Exception as e:
|
|
logger.error(f"Fehler beim Lesen der Datei {file_name}: {str(e)}")
|
|
_log(add_log_func, workflow_id, f"Fehler beim Lesen der Datei {file_name}: {str(e)}", "error")
|
|
else:
|
|
if file_path:
|
|
_log(add_log_func, workflow_id, f"Datei {file_name} nicht gefunden: {file_path}", "warning")
|
|
else:
|
|
_log(add_log_func, workflow_id, f"Kein Pfad für Datei {file_name} verfügbar", "warning")
|
|
file_contents[file_id] = f"Dateiinhalt nicht verfügbar"
|
|
|
|
return file_contents
|
|
|
|
def format_file_context_text(file_contexts: List[Dict[str, Any]], file_contents: Dict[str, str]) -> str:
|
|
"""
|
|
Erstellt eine formatierte Textdarstellung aller Dateien und ihrer Inhalte
|
|
|
|
Args:
|
|
file_contexts: Liste der Dateikontexte mit Metadaten
|
|
file_contents: Dictionary mit Dateiinhalten
|
|
|
|
Returns:
|
|
Formatierter Text mit Dateiliste und Inhaltsauszügen
|
|
"""
|
|
# Erstelle einen Kontext mit Dateiliste und Inhalten für leichteren Zugriff
|
|
file_context_text = "Verfügbare Dateien:\n" + "\n".join([
|
|
f"- {file['name']} ({file['type']}, {file['size']}, ID: {file['id']})"
|
|
for file in file_contexts
|
|
])
|
|
|
|
# Füge Dateiinhalte hinzu (ohne Längenbegrenzung)
|
|
for file_id, content in file_contents.items():
|
|
file_name = next((f['name'] for f in file_contexts if f['id'] == file_id), "Unbekannte Datei")
|
|
file_context_text += f"\n\n==== DATEIINHALT: {file_name} (ID: {file_id}) ====\n"
|
|
file_context_text += content
|
|
|
|
return file_context_text
|
|
|
|
def prepare_file_contexts(files: List[Dict[str, Any]], upload_dir: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Bereitet die Dateikontexte vor und ermittelt die vollen Dateipfade
|
|
|
|
Args:
|
|
files: Liste von Dateien mit Metadaten (Dict mit id, name, type)
|
|
upload_dir: Verzeichnis für Uploads
|
|
|
|
Returns:
|
|
Liste von Dateikontexten mit vollständigen Pfaden
|
|
"""
|
|
file_contexts = []
|
|
|
|
for file in files:
|
|
file_id = file["id"]
|
|
file_name = file["name"]
|
|
file_type = file["type"]
|
|
file_path = file.get("path", "")
|
|
|
|
# Wenn kein Pfad angegeben ist, versuche, ihn aus dem Upload-Verzeichnis abzuleiten
|
|
if not file_path and file_name:
|
|
possible_path = os.path.join(upload_dir, file_name)
|
|
if os.path.exists(possible_path):
|
|
file_path = possible_path
|
|
logger.debug(f"Pfad für Datei {file_name} gefunden: {file_path}")
|
|
|
|
file_contexts.append({
|
|
"id": file_id,
|
|
"name": file_name,
|
|
"type": file_type,
|
|
"size": file.get("size", "Unbekannt"),
|
|
"path": file_path
|
|
})
|
|
|
|
return file_contexts
|
|
|
|
async def prepare_message_for_ai(
|
|
file_contexts: List[Dict[str, Any]],
|
|
prompt_text: str,
|
|
file_contents: Dict[str, str],
|
|
service_aichat
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Bereitet eine vollständige Nachricht mit allen Dateiinhalten für das AI-Modell vor.
|
|
Benutzt den AI-Connector, um spezielle Datei-Analysen (wie Bild-Analysen) auszuführen.
|
|
|
|
Args:
|
|
file_contexts: Liste der Dateikontexte mit Metadaten
|
|
prompt_text: Der Text-Prompt
|
|
file_contents: Dictionary mit bereits geladenen Dateiinhalten
|
|
service_aichat: Die AI-Service-Instanz für spezielle Analysen
|
|
|
|
Returns:
|
|
Eine vollständig formatierte Nachricht für das AI-Modell
|
|
"""
|
|
# Rufe die Methode des AI-Connectors auf, um die Nachricht zu erstellen
|
|
return await service_aichat.parse_filedata(file_contexts, prompt_text, file_contents)
|
|
|
|
def _log(add_log_func, workflow_id, message, log_type, agent_id=None, agent_name=None):
|
|
"""Hilfsfunktion zum Loggen mit unterschiedlichen Log-Funktionen"""
|
|
# Log über die Logger-Instanz
|
|
if log_type == "error":
|
|
logger.error(message)
|
|
elif log_type == "warning":
|
|
logger.warning(message)
|
|
else:
|
|
logger.info(message)
|
|
|
|
# Log über die bereitgestellte Log-Funktion (falls vorhanden)
|
|
if add_log_func and workflow_id:
|
|
add_log_func(workflow_id, message, log_type, agent_id, agent_name)
|
|
|
|
# Die folgenden Funktionen werden nicht mehr benötigt, da partielle Dateiladungen entfallen
|
|
# Sie sind hier auskommentiert, könnten später aber wieder aktiviert werden
|
|
|
|
"""
|
|
def parse_file_access_commands(agent_text: str) -> List[Dict[str, Any]]:
|
|
# Diese Funktion wird vorerst nicht benötigt
|
|
return []
|
|
|
|
def load_additional_file_content(
|
|
workflow_id: str,
|
|
file_id: str,
|
|
file_contents: Dict[str, str],
|
|
file_contexts: List[Dict[str, Any]],
|
|
add_log_func = None,
|
|
read_complete: bool = False,
|
|
start_pos: int = None,
|
|
end_pos: int = None,
|
|
page_numbers: List[int] = None
|
|
) -> Optional[str]:
|
|
# Diese Funktion wird vorerst nicht benötigt
|
|
return None
|
|
""" |