gateway/modules/chat_agent_coder.py
2025-04-20 22:22:22 +02:00

795 lines
No EOL
34 KiB
Python

"""
Coder-Agent für die Entwicklung und Ausführung von Python-Code.
Angepasst für die neue chat.py Architektur und chat_registry.py.
"""
import logging
import json
import re
import uuid
import os
import subprocess
import tempfile
import shutil
import sys
from typing import Dict, Any, List, Optional, Tuple
from modules.chat_registry import AgentBase
logger = logging.getLogger(__name__)
class AgentCoder(AgentBase):
"""Agent für die Entwicklung und Ausführung von Python-Code"""
def __init__(self):
"""Initialisiert den Coder-Agent"""
super().__init__()
self.name = "Python Code Agent"
self.capabilities = "code_development,data_processing,file_processing,automation"
# Executor-Einstellungen
self.executor_timeout = 60 # Sekunden
self.executor_memory_limit = 512 # MB
# KI-Service-Einstellungen
self.ai_temperature = 0.1 # Niedrigere Temperatur für deterministische Codegenerierung
# Auto-Korrektur-Einstellungen
self.max_correction_attempts = 3 # Maximale Anzahl von Korrekturversuchen
def get_agent_info(self) -> Dict[str, Any]:
"""Gibt Agent-Informationen für die Registry zurück"""
info = super().get_config()
return info
async def process_message(self, message: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Verarbeitet eine Nachricht zur Entwicklung und Ausführung von Python-Code.
Args:
message: Die zu verarbeitende Nachricht
context: Zusätzliche Kontextinformationen
Returns:
Antwortnachricht
"""
# Workflow-ID aus Kontext oder Nachricht extrahieren
workflow_id = context.get("workflow_id") if context else message.get("workflow_id", "unknown")
# Antwortstruktur erstellen
response = {
"role": "assistant",
"content": "",
"agent_name": self.name,
"workflow_id": workflow_id,
"documents": []
}
try:
# Inhalt und Dokumente extrahieren
content = message.get("content", "")
documents = message.get("documents", [])
# Code basierend auf dem Nachrichteninhalt mit KI generieren
logger.info("Generiere neuen Code mit KI")
# Code mit KI generieren
code_to_execute, requirements = await self._generate_code_from_prompt(content, documents)
if not code_to_execute:
logger.warning("KI konnte keinen Code generieren")
response["content"] = "Ich konnte basierend auf Ihrer Anfrage keinen ausführbaren Code generieren. Bitte geben Sie detailliertere Anweisungen."
return response
logger.info(f"Code mit KI generiert ({len(code_to_execute)} Zeichen)")
# Code-Datei-Dokument erstellen
code_doc_id = f"code_{uuid.uuid4()}"
code_filename = "generated_code.py"
code_document = {
"id": code_doc_id,
"source": {
"type": "generated",
"id": code_doc_id,
"name": code_filename,
"content_type": "text/x-python"
},
"contents": [{
"type": "text",
"text": code_to_execute,
"is_extracted": True
}]
}
# Code-Dokument zur Antwort hinzufügen
response["documents"].append(code_document)
logger.info(f"Code-Datei '{code_filename}' zur Antwort hinzugefügt")
# Code mit Auto-Korrektur-Schleife ausführen
if code_to_execute:
# Ausführungskontext vorbereiten
execution_context = {
"workflow_id": workflow_id,
"documents": documents,
"message": message
}
# Verbesserte Ausführung mit Auto-Korrektur
result, attempts_info = await self._execute_with_auto_correction(
code_to_execute,
requirements,
execution_context,
content # Originaler Prompt/Nachricht
)
# Antwort basierend auf dem endgültigen Ergebnis vorbereiten (Erfolg oder Fehler)
if result.get("success", False):
# Code-Ausführung erfolgreich
output = result.get("output", "")
execution_result = result.get("result")
logger.info("Code erfolgreich ausgeführt")
# Antwortinhalt formatieren
response_content = f"## Code erfolgreich ausgeführt"
# Informationen zu Korrekturversuchen hinzufügen, falls Korrekturen vorgenommen wurden
if attempts_info and len(attempts_info) > 1:
response_content += f" (nach {len(attempts_info)-1} Korrekturversuchen)"
response_content += "\n\n"
# Den ausgeführten Code einbeziehen
response_content += f"### Ausgeführter Code\n\n```python\n{attempts_info[-1]['code']}\n```\n\n"
# Die Ausgabe einbeziehen, falls verfügbar
if output:
response_content += f"### Ausgabe\n\n```\n{output}\n```\n\n"
# Das Ausführungsergebnis einbeziehen, falls verfügbar
if execution_result:
result_str = json.dumps(execution_result, indent=2) if isinstance(execution_result, (dict, list)) else str(execution_result)
response_content += f"### Ergebnis\n\n```\n{result_str}\n```\n\n"
response["content"] = response_content
else:
# Code-Ausführung nach allen Versuchen fehlgeschlagen
error = result.get("error", "Unbekannter Fehler")
logger.error(f"Fehler bei der Code-Ausführung nach allen Korrekturversuchen: {error}")
# Fehlerantwort formatieren
response_content = f"## Fehler bei der Code-Ausführung\n\n"
# Informationen zu Korrekturversuchen hinzufügen
if attempts_info:
response_content += f"Ich habe {len(attempts_info)} Versuche unternommen, den Code zu korrigieren, konnte aber nicht alle Probleme lösen.\n\n"
# Den letzten Versuch hinzufügen
response_content += f"### Letzter Code-Versuch\n\n```python\n{attempts_info[-1]['code']}\n```\n\n"
response_content += f"### Letzter Fehler\n\n```\n{attempts_info[-1]['error']}\n```\n\n"
# Empfehlung basierend auf dem Fehler hinzufügen
response_content += "### Empfehlung\n\n"
response_content += self._get_error_recommendation(error)
else:
# Nur den Code und den Fehler anzeigen
response_content += f"### Ausgeführter Code\n\n```python\n{code_to_execute}\n```\n\n"
response_content += f"### Fehler\n\n```\n{error}\n```\n\n"
response["content"] = response_content
else:
# Kein auszuführender Code
response["content"] = "Ich konnte keinen ausführbaren Code finden oder generieren. Bitte geben Sie Python-Code an oder erklären Sie Ihre Anforderungen klarer."
return response
except Exception as e:
error_msg = f"Fehler bei der Verarbeitung durch den Coder-Agent: {str(e)}"
logger.error(error_msg)
response["content"] = f"## Verarbeitungsfehler\n\n```\n{error_msg}\n```"
return response
async def _execute_with_auto_correction(
self,
initial_code: str,
requirements: List[str],
context: Dict[str, Any],
original_prompt: str
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Führt Code mit automatischer Fehlerkorrektur und Wiederholungsversuchen aus.
Args:
initial_code: Der anfängliche Python-Code
requirements: Liste erforderlicher Pakete
context: Zusätzlicher Kontext für die Ausführung
original_prompt: Die ursprüngliche Benutzeranfrage/Prompt
Returns:
Tuple aus (endgültiges Ausführungsergebnis, Liste von Versuchsinfo-Dictionarys)
"""
# Verfolgungs-Daten initialisieren
current_code = initial_code
current_requirements = requirements.copy() if requirements else []
attempts_info = []
# Mit Korrekturschleife ausführen
for attempt in range(1, self.max_correction_attempts + 1):
if attempt == 1:
logger.info(f"Führe Code aus (Versuch {attempt}/{self.max_correction_attempts})")
else:
logger.info(f"Führe korrigierten Code aus (Versuch {attempt}/{self.max_correction_attempts})")
# Aktuelle Code-Version ausführen
result = await self._execute_code(current_code, current_requirements, context)
# Versuchsinformationen aufzeichnen
attempts_info.append({
"attempt": attempt,
"code": current_code,
"error": result.get("error", ""),
"success": result.get("success", False)
})
# Prüfen, ob die Ausführung erfolgreich war
if result.get("success", False):
# Erfolg! Ergebnis und Versuchsinfo zurückgeben
return result, attempts_info
# Fehlgeschlagene Ausführung - prüfen, ob die maximale Versuchsgrenze erreicht wurde
if attempt >= self.max_correction_attempts:
logger.warning(f"Maximale Korrekturversuche ({self.max_correction_attempts}) erreicht, Aufgabe")
break
# Code basierend auf dem Fehler korrigieren
error_message = result.get("error", "Unbekannter Fehler")
logger.info(f"Versuche, Code-Fehler zu beheben: {error_message[:200]}...")
# Korrigierten Code generieren
corrected_code, new_requirements = await self._generate_code_correction(
current_code,
error_message,
original_prompt,
current_requirements
)
# Für den nächsten Versuch aktualisieren
if corrected_code:
current_code = corrected_code
# Neue Anforderungen hinzufügen
if new_requirements:
for req in new_requirements:
if req not in current_requirements:
current_requirements.append(req)
logger.info(f"Neue Anforderung hinzugefügt: {req}")
else:
# Korrektur konnte nicht generiert werden, Schleife beenden
logger.warning("Konnte keine Code-Korrektur generieren, Aufgabe")
break
# Wenn wir hierher gelangen, sind alle Versuche fehlgeschlagen - das letzte Ergebnis und die Versuchsinfo zurückgeben
return result, attempts_info
async def _generate_code_correction(
self,
code: str,
error_message: str,
original_prompt: str,
current_requirements: List[str] = None
) -> Tuple[str, List[str]]:
"""
Generiert eine korrigierte Version des Codes basierend auf Fehlermeldungen.
Args:
code: Der Code, der Fehler erzeugt hat
error_message: Die zu behebende Fehlermeldung
original_prompt: Die ursprüngliche Aufgabe/Anforderungen
current_requirements: Liste der aktuell erforderlichen Pakete
Returns:
Tuple aus (korrigierter Code, neue Anforderungsliste)
"""
try:
# Detaillierten Prompt für Code-Korrektur erstellen
correction_prompt = f"""Du musst einen Fehler in Python-Code beheben. Der Code wurde für diese Aufgabe geschrieben:
ORIGINALE AUFGABE:
{original_prompt}
AKTUELLER CODE:
```python
{code}
```
FEHLERMELDUNG:
```
{error_message}
```
AKTUELLE ANFORDERUNGEN: {', '.join(current_requirements) if current_requirements else "Keine"}
Deine Aufgabe ist es, den Fehler zu analysieren und eine korrigierte Version des Codes bereitzustellen.
Konzentriere dich speziell auf die Behebung des Fehlers unter Beibehaltung der ursprünglichen Funktionalität.
Häufige Korrekturen sind:
- Behebung von Syntaxfehlern (fehlende Klammern, Einrückung usw.)
- Lösung von Import-Fehlern durch Hinzufügen geeigneter Anforderungen
- Korrektur von Dateipfaden oder Behandlung von "Datei nicht gefunden"-Fehlern
- Hinzufügen von Fehlerbehandlung für bestimmte Randfälle
- Behebung logischer Fehler im Code
FORMATIERUNGSHINWEISE:
1. Gib NUR den vollständigen korrigierten Python-Code an OHNE Erklärungen
2. Verwende KEINE Codeblock-Markierungen wie ```python oder ```
3. Erkläre NICHT, was der Code davor oder danach tut
4. Füge KEINEN Text hinzu, der kein gültiger Python-Code ist
5. Beginne deine Antwort direkt mit dem gültigen Python-Code
6. Beende deine Antwort mit gültigem Python-Code
Wenn du neue erforderliche Pakete hinzufügen musst, platziere sie in einem speziell formatierten Kommentar am Anfang deines Codes wie folgt:
# REQUIREMENTS: paket1,paket2,paket3
Deine gesamte Antwort muss gültiges Python sein, das ohne Änderungen ausgeführt werden kann.
"""
# Nachrichten für die API erstellen
messages = [
{"role": "system", "content": "Du bist ein Python-Debugging-Experte. Du gibst NUR sauberen, fehlerfreien Python-Code zurück, ohne Erklärungen, Markdown-Formatierung oder Text, der kein Code ist. Deine Antwort sollte nur gültiger, korrigierter Python-Code sein, der direkt ausgeführt werden kann."},
{"role": "user", "content": correction_prompt}
]
# API mit sehr niedriger Temperatur für deterministische Korrekturen aufrufen
generated_content = await self.ai_service.call_api(
messages,
temperature=0.1
)
# Den generierten Inhalt bereinigen, um sicherzustellen, dass es sich nur um gültigen Python-Code handelt
fixed_code = self._clean_code(generated_content)
# Anforderungen aus speziellem Kommentar am Anfang des Codes extrahieren
new_requirements = []
for line in fixed_code.split('\n'):
if line.strip().startswith("# REQUIREMENTS:"):
req_str = line.replace("# REQUIREMENTS:", "").strip()
new_requirements = [r.strip() for r in req_str.split(',') if r.strip()]
break
return fixed_code, new_requirements
except Exception as e:
logging.error(f"Fehler bei der Generierung der Code-Korrektur: {str(e)}")
# None zurückgeben, um Fehler anzuzeigen
return None, []
def _clean_code(self, code: str) -> str:
"""
Bereinigt Code durch Entfernen von Markdown-Codeblock-Markierungen und anderen Formatierungsartefakten.
Args:
code: Der zu bereinigende Code-String
Returns:
Bereinigter Code-String
"""
# Codeblock-Markierungen am Anfang/Ende entfernen
code = re.sub(r'^```(?:python)?\s*', '', code)
code = re.sub(r'```\s*$', '', code)
# Zeilen in umgekehrter Reihenfolge durchgehen, um dem Ende zu beginnen
lines = code.split('\n')
clean_lines = []
in_trailing_markdown = False
for line in reversed(lines):
stripped = line.strip()
# Prüfen, ob diese Zeile nur Backticks enthält (``` oder ` oder ``)
if re.match(r'^`{1,3}$', stripped):
in_trailing_markdown = True
continue
# Wenn wir tatsächlichen Code erreicht haben, keine nachfolgende Markdown-Berücksichtigung mehr
if stripped and not in_trailing_markdown:
in_trailing_markdown = False
# Diese Zeile hinzufügen, wenn sie nicht Teil von nachfolgendem Markdown ist
if not in_trailing_markdown:
clean_lines.insert(0, line)
# Zeilen wieder zusammenfügen
clean_code = '\n'.join(clean_lines)
# Endgültige Bereinigung für alle restlichen Backticks
clean_code = re.sub(r'`{1,3}\s*, ', clean_code)
return clean_code.strip()
async def _generate_code_from_prompt(self, prompt: str, documents: List[Dict[str, Any]]) -> Tuple[str, List[str]]:
"""
Generiert Python-Code aus einem Prompt mithilfe des KI-Dienstes.
Args:
prompt: Der Prompt, aus dem Code generiert wird
documents: Mit dem Prompt verbundene Dokumente
Returns:
Tuple aus (generierter Python-Code, erforderliche Pakete)
"""
try:
# Prompt für die Codegenerierung vorbereiten
ai_prompt = f"""Generiere Python-Code, um die folgende Aufgabe zu lösen:
{prompt}
Verfügbare Eingabedateien:
"""
# Informationen über verfügbare Dokumente hinzufügen
if documents:
for i, doc in enumerate(documents):
source = doc.get("source", {})
doc_name = source.get("name", f"Dokument {i+1}")
doc_type = source.get("content_type", "unbekannt")
doc_id = source.get("id", "")
ai_prompt += f"- {doc_name} (Typ: {doc_type}, ID: {doc_id})\n"
else:
ai_prompt += "Keine Eingabedateien verfügbar.\n"
ai_prompt += """
WICHTIGE ANFORDERUNGEN:
1. Dein Code MUSS eine 'result'-Variable definieren, um das endgültige Ergebnis zu speichern.
2. Am Ende deines Skripts sollte die result-Variable ausgegeben werden.
3. Mache deine 'result'-Variable zu einem Dictionary oder einer anderen JSON-serialisierbaren Datenstruktur, die alle relevanten Ausgaben enthält.
4. Kommentiere den Code gut, um wichtige Operationen zu erklären.
5. Mache deinen Code vollständig und in sich geschlossen.
6. Füge eine angemessene Fehlerbehandlung hinzu.
FORMATIERUNGSANWEISUNGEN:
- Gib NUR den Python-Code zurück, OHNE Einleitung, Erklärung oder Abschlusstext
- Verwende KEINE Codeblock-Markierungen wie ```python oder ```
- Erkläre NICHT, was der Code davor oder danach tut
- Füge KEINEN Text hinzu, der kein gültiger Python-Code ist
- Beginne deine Antwort direkt mit gültigem Python-Code
- Beende deine Antwort mit gültigem Python-Code
Für erforderliche Pakete platziere sie in einem speziell formatierten Kommentar am Anfang deines Codes in einer Zeile wie folgt:
# REQUIREMENTS: pandas,numpy,matplotlib,requests
Deine gesamte Antwort muss gültiges Python sein, das ohne Änderungen ausgeführt werden kann.
"""
# Nachrichten für die API erstellen
messages = [
{"role": "system", "content": "Du bist ein Python-Codegenerator, der NUR sauberen, ausführbaren Python-Code ohne Erklärungen, Markdown-Formatierung oder Nicht-Code-Text liefert. Deine Antwort sollte ausschließlich aus gültigem Python-Code bestehen, der direkt ausgeführt werden kann."},
{"role": "user", "content": ai_prompt}
]
# API aufrufen
logging.info(f"KI-API aufrufen, um Code zu generieren")
generated_content = await self.ai_service.call_api(messages, temperature=self.ai_temperature)
# Den generierten Inhalt bereinigen, um sicherzustellen, dass es sich nur um gültigen Python-Code handelt
code = self._clean_code(generated_content)
# Anforderungen aus speziellem Kommentar am Anfang des Codes extrahieren
requirements = []
for line in code.split('\n'):
if line.strip().startswith("# REQUIREMENTS:"):
req_str = line.replace("# REQUIREMENTS:", "").strip()
requirements = [r.strip() for r in req_str.split(',') if r.strip()]
break
return code, requirements
except Exception as e:
logging.error(f"Fehler bei der Generierung von Code mit KI: {str(e)}")
# Grundlegenden Fehlerbehandlungscode und keine Anforderungen zurückgeben
error_str = str(e).replace('"', '\\"')
return f"""
# Fehler bei der Codegenerierung
print(f"Bei der Codegenerierung ist ein Fehler aufgetreten: {error_str}")
# Fehlerergebnis zurückgeben
result = {{"error": "Codegenerierung fehlgeschlagen", "message": "{error_str}"}}
""", []
async def _execute_code(self, code: str, requirements: List[str] = None, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Führt Python-Code mit dem SimpleCodeExecutor aus.
Args:
code: Der auszuführende Python-Code
requirements: Liste erforderlicher Pakete
context: Zusätzlicher Kontext für die Ausführung
Returns:
Ergebnis der Codeausführung
"""
# Workflow-ID abrufen und Logging einrichten
workflow_id = context.get("workflow_id", "") if context else ""
try:
# Liste blockierter Pakete für die Sicherheit
blocked_packages = [
"cryptography", "flask", "django", "tornado", # Sicherheitsrisiken
"tensorflow", "pytorch", "scikit-learn" # Ressourcenintensive Pakete
]
# SimpleCodeExecutor mit Anforderungen und workflow_id für Persistenz initialisieren
executor = SimpleCodeExecutor(
workflow_id=workflow_id,
timeout=self.executor_timeout,
max_memory_mb=self.executor_memory_limit,
requirements=requirements,
blocked_packages=blocked_packages,
ai_service=self.ai_service
)
# Eingabedaten für den Code vorbereiten
input_data = {"context": context, "workflow_id": workflow_id}
# Code ausführen
result = executor.execute_code(code, input_data)
# Nicht-persistente Umgebungen bereinigen
if not executor.is_persistent:
executor.cleanup()
return result
except Exception as e:
error_message = f"Fehler bei der Codeausführung: {str(e)}"
logger.error(error_message)
return {
"success": False,
"output": "",
"error": error_message,
"result": None
}
def _get_error_recommendation(self, error_message: str) -> str:
"""Generiert Empfehlungen basierend auf der Fehlermeldung."""
if "ImportError" in error_message or "ModuleNotFoundError" in error_message:
return """
Versuchen Sie, Standardbibliotheken oder häufig verwendete Datenanalysemodule zu verwenden.
"""
elif "PermissionError" in error_message:
return """
Der Code hat nicht die notwendigen Berechtigungen, um auf Dateien oder Verzeichnisse zuzugreifen.
"""
elif "SyntaxError" in error_message:
return """
Es gibt einen Syntaxfehler im Code. Überprüfen Sie auf fehlende Klammern, Anführungszeichen, Doppelpunkte oder Einrückungsfehler.
"""
elif "FileNotFoundError" in error_message:
return """
Eine Datei konnte nicht gefunden werden. Überprüfen Sie den Dateipfad und stellen Sie sicher, dass die Datei existiert.
"""
else:
return """
Um den Fehler zu beheben:
1. Überprüfen Sie die genaue Fehlermeldung
2. Vereinfachen Sie den Code und testen Sie schrittweise
3. Verwenden Sie try/except-Blöcke für fehleranfällige Operationen
"""
class SimpleCodeExecutor:
"""
Ein vereinfachter Executor, der Python-Code in isolierten virtuellen Umgebungen ausführt.
"""
# Klassenvariable zum Speichern von Workflow-Umgebungen für die Persistenz
_workflow_environments = {}
def __init__(self,
workflow_id: str = None,
timeout: int = 30,
max_memory_mb: int = 512,
requirements: List[str] = None,
blocked_packages: List[str] = None,
ai_service = None):
"""
Initialisiert den SimpleCodeExecutor.
Args:
workflow_id: Optionale Workflow-ID für persistente Umgebungen
timeout: Maximale Ausführungszeit in Sekunden
max_memory_mb: Maximaler Speicher in MB
requirements: Liste der zu installierenden Pakete
blocked_packages: Liste blockierter Pakete
"""
self.workflow_id = workflow_id
self.timeout = timeout
self.max_memory_mb = max_memory_mb
self.temp_dir = None
self.requirements = requirements or []
self.blocked_packages = blocked_packages or [
"cryptography", "flask", "django", "tornado", # Sicherheitsrisiken
"tensorflow", "pytorch", "scikit-learn" # Ressourcenintensive Pakete
]
self.is_persistent = workflow_id is not None
self.ai_service = ai_service
def _create_venv(self) -> str:
"""Erstellt eine virtuelle Umgebung und gibt den Pfad zurück."""
# Prüfen auf bestehende Umgebung bei Verwendung von workflow_id
if self.workflow_id:
self.is_persistent = True
existing_env = self._workflow_environments.get(self.workflow_id)
if existing_env and os.path.exists(existing_env):
logger.info(f"Wiederverwendung bestehender virtueller Umgebung: {existing_env}")
self.temp_dir = os.path.dirname(existing_env)
return existing_env
# Neue Umgebung erstellen
venv_parent_dir = tempfile.mkdtemp(prefix="code_exec_")
self.temp_dir = venv_parent_dir
venv_path = os.path.join(venv_parent_dir, "venv")
try:
# Virtuelle Umgebung erstellen
subprocess.run([sys.executable, "-m", "venv", venv_path],
check=True,
capture_output=True)
# Umgebungspfad speichern, wenn für einen bestimmten Workflow
if self.workflow_id:
self._workflow_environments[self.workflow_id] = venv_path
return venv_path
except subprocess.CalledProcessError as e:
logger.error(f"Fehler beim Erstellen der virtuellen Umgebung: {e}")
raise RuntimeError(f"Virtuelle Umgebung konnte nicht erstellt werden: {e}")
def _get_python_executable(self, venv_path: str) -> str:
"""Gibt den Pfad zum Python-Executable in der virtuellen Umgebung zurück."""
if os.name == 'nt': # Windows
return os.path.join(venv_path, "Scripts", "python.exe")
else: # Unix/Linux
return os.path.join(venv_path, "bin", "python")
def _extract_required_packages(self, code: str) -> List[str]:
"""Extrahiert erforderliche Pakete aus REQUIREMENTS-Kommentaren in der ersten Codezeile"""
packages = set()
# Nach speziellem REQUIREMENTS-Kommentar suchen
first_lines = code.split('\n')[:5] # Nur die ersten Zeilen prüfen
for line in first_lines:
if line.strip().startswith("# REQUIREMENTS:"):
req_str = line.replace("# REQUIREMENTS:", "").strip()
for pkg in req_str.split(','):
if pkg.strip():
packages.add(pkg.strip())
return list(packages)
def execute_code(self, code: str, input_data: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Führt Python-Code in einer isolierten Umgebung aus.
Args:
code: Auszuführender Python-Code
input_data: Optionale Eingabedaten für den Code
Returns:
Dictionary mit Ausführungsergebnissen
"""
logger.info(f"Führe Code mit workflow_id aus: {self.workflow_id}")
# Virtuelle Umgebung erstellen oder wiederverwenden
venv_path = self._create_venv()
# Datei für den Code erstellen
code_id = uuid.uuid4().hex[:8]
code_file = os.path.join(self.temp_dir, f"code_{code_id}.py")
# Code ohne zusätzlichen Loader-Code schreiben
with open(code_file, "w", encoding="utf-8") as f:
f.write(code)
# Python-Executable holen
python_executable = self._get_python_executable(venv_path)
logger.info(f"Verwende Python-Executable: {python_executable}")
# Code ausführen
try:
# Code aus Root-Verzeichnis ausführen
working_dir = os.path.dirname(code_file)
process = subprocess.run(
[python_executable, code_file],
timeout=self.timeout,
capture_output=True,
text=True,
cwd=working_dir
)
# Ausgabe verarbeiten
stdout = process.stdout
stderr = process.stderr
# Ergebnis aus stdout holen, falls verfügbar
result_data = None
if process.returncode == 0 and stdout:
try:
# Nach der letzten Zeile suchen, die JSON sein könnte
for line in reversed(stdout.strip().split('\n')):
line = line.strip()
if line and line[0] in '{[' and line[-1] in '}]':
try:
result_data = json.loads(line)
# Erfolgreich geparste JSON-Ergebnis verwenden
break
except json.JSONDecodeError:
# Kein gültiges JSON, mit nächster Zeile fortfahren
continue
except Exception as e:
logger.warning(f"Fehler beim Parsen des Ergebnisses aus stdout: {str(e)}")
# Ergebnisdictionary erstellen
execution_result = {
"success": process.returncode == 0,
"output": stdout,
"error": stderr if process.returncode != 0 else "",
"result": result_data,
"exit_code": process.returncode
}
except subprocess.TimeoutExpired:
logger.error(f"Ausführung nach {self.timeout} Sekunden abgelaufen")
execution_result = {
"success": False,
"output": "",
"error": f"Ausführung abgelaufen (Timeout nach {self.timeout} Sekunden)",
"result": None,
"exit_code": -1
}
except Exception as e:
logger.error(f"Ausführungsfehler: {str(e)}")
execution_result = {
"success": False,
"output": "",
"error": f"Ausführungsfehler: {str(e)}",
"result": None,
"exit_code": -1
}
# Temporäre Code-Datei bereinigen
try:
if os.path.exists(code_file):
os.remove(code_file)
except Exception as e:
logger.warning(f"Fehler beim Bereinigen der temporären Code-Datei: {e}")
return execution_result
def cleanup(self):
"""Temporäre Ressourcen bereinigen."""
# Bereinigung für persistente Umgebungen überspringen
if self.is_persistent and self.workflow_id:
logger.info(f"Überspringe Bereinigung für persistente Umgebung von Workflow {self.workflow_id}")
return
# Temporäres Verzeichnis bereinigen
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
logger.info(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}")
except Exception as e:
logger.warning(f"Temporäres Verzeichnis {self.temp_dir} konnte nicht gelöscht werden: {e}")
def __del__(self):
"""Bereinigung während der Garbage Collection."""
self.cleanup()
# Singleton-Instanz
_coder_agent = None
def get_coder_agent():
"""Gibt eine Singleton-Instanz des Coder-Agenten zurück"""
global _coder_agent
if _coder_agent is None:
_coder_agent = AgentCoder()
return _coder_agent