""" Coder-Agent für die Entwicklung und Ausführung von Python-Code. Angepasst für die neue chat.py Architektur und chat_registry.py. """ import logging import json import re import uuid import os import subprocess import tempfile import shutil import sys from typing import Dict, Any, List, Optional, Tuple from modules.chat_registry import AgentBase logger = logging.getLogger(__name__) class AgentCoder(AgentBase): """Agent für die Entwicklung und Ausführung von Python-Code""" def __init__(self): """Initialisiert den Coder-Agent""" super().__init__() self.name = "Python Code Agent" self.capabilities = "code_development,data_processing,file_processing,automation" self.result_format = "python_code" # Executor-Einstellungen self.executor_timeout = 60 # Sekunden self.executor_memory_limit = 512 # MB # KI-Service-Einstellungen self.ai_temperature = 0.1 # Niedrigere Temperatur für deterministische Codegenerierung # Auto-Korrektur-Einstellungen self.max_correction_attempts = 3 # Maximale Anzahl von Korrekturversuchen def get_agent_info(self) -> Dict[str, Any]: """Gibt Agent-Informationen für die Registry zurück""" info = super().get_config() info.update({ "metadata": { "timeout": self.executor_timeout, "memory_limit": self.executor_memory_limit, "max_correction_attempts": self.max_correction_attempts } }) return info async def process_message(self, message: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]: """ Verarbeitet eine Nachricht zur Entwicklung und Ausführung von Python-Code. Args: message: Die zu verarbeitende Nachricht context: Zusätzliche Kontextinformationen Returns: Antwortnachricht """ # Workflow-ID aus Kontext oder Nachricht extrahieren workflow_id = context.get("workflow_id") if context else message.get("workflow_id", "unknown") # Antwortstruktur erstellen response = { "role": "assistant", "content": "", "agent_name": self.name, "result_format": self.result_format, "workflow_id": workflow_id, "documents": [] } try: # Inhalt und Dokumente extrahieren content = message.get("content", "") documents = message.get("documents", []) # Code basierend auf dem Nachrichteninhalt mit KI generieren logger.info("Generiere neuen Code mit KI") # Code mit KI generieren code_to_execute, requirements = await self._generate_code_from_prompt(content, documents) if not code_to_execute: logger.warning("KI konnte keinen Code generieren") response["content"] = "Ich konnte basierend auf Ihrer Anfrage keinen ausführbaren Code generieren. Bitte geben Sie detailliertere Anweisungen." return response logger.info(f"Code mit KI generiert ({len(code_to_execute)} Zeichen)") # Code-Datei-Dokument erstellen code_doc_id = f"code_{uuid.uuid4()}" code_filename = "generated_code.py" code_document = { "id": code_doc_id, "source": { "type": "generated", "id": code_doc_id, "name": code_filename, "content_type": "text/x-python" }, "contents": [{ "type": "text", "text": code_to_execute, "is_extracted": True }] } # Code-Dokument zur Antwort hinzufügen response["documents"].append(code_document) logger.info(f"Code-Datei '{code_filename}' zur Antwort hinzugefügt") # Code mit Auto-Korrektur-Schleife ausführen if code_to_execute: # Ausführungskontext vorbereiten execution_context = { "workflow_id": workflow_id, "documents": documents, "message": message } # Verbesserte Ausführung mit Auto-Korrektur result, attempts_info = await self._execute_with_auto_correction( code_to_execute, requirements, execution_context, content # Originaler Prompt/Nachricht ) # Antwort basierend auf dem endgültigen Ergebnis vorbereiten (Erfolg oder Fehler) if result.get("success", False): # Code-Ausführung erfolgreich output = result.get("output", "") execution_result = result.get("result") logger.info("Code erfolgreich ausgeführt") # Antwortinhalt formatieren response_content = f"## Code erfolgreich ausgeführt" # Informationen zu Korrekturversuchen hinzufügen, falls Korrekturen vorgenommen wurden if attempts_info and len(attempts_info) > 1: response_content += f" (nach {len(attempts_info)-1} Korrekturversuchen)" response_content += "\n\n" # Den ausgeführten Code einbeziehen response_content += f"### Ausgeführter Code\n\n```python\n{attempts_info[-1]['code']}\n```\n\n" # Die Ausgabe einbeziehen, falls verfügbar if output: response_content += f"### Ausgabe\n\n```\n{output}\n```\n\n" # Das Ausführungsergebnis einbeziehen, falls verfügbar if execution_result: result_str = json.dumps(execution_result, indent=2) if isinstance(execution_result, (dict, list)) else str(execution_result) response_content += f"### Ergebnis\n\n```\n{result_str}\n```\n\n" response["content"] = response_content else: # Code-Ausführung nach allen Versuchen fehlgeschlagen error = result.get("error", "Unbekannter Fehler") logger.error(f"Fehler bei der Code-Ausführung nach allen Korrekturversuchen: {error}") # Fehlerantwort formatieren response_content = f"## Fehler bei der Code-Ausführung\n\n" # Informationen zu Korrekturversuchen hinzufügen if attempts_info: response_content += f"Ich habe {len(attempts_info)} Versuche unternommen, den Code zu korrigieren, konnte aber nicht alle Probleme lösen.\n\n" # Den letzten Versuch hinzufügen response_content += f"### Letzter Code-Versuch\n\n```python\n{attempts_info[-1]['code']}\n```\n\n" response_content += f"### Letzter Fehler\n\n```\n{attempts_info[-1]['error']}\n```\n\n" # Empfehlung basierend auf dem Fehler hinzufügen response_content += "### Empfehlung\n\n" response_content += self._get_error_recommendation(error) else: # Nur den Code und den Fehler anzeigen response_content += f"### Ausgeführter Code\n\n```python\n{code_to_execute}\n```\n\n" response_content += f"### Fehler\n\n```\n{error}\n```\n\n" response["content"] = response_content else: # Kein auszuführender Code response["content"] = "Ich konnte keinen ausführbaren Code finden oder generieren. Bitte geben Sie Python-Code an oder erklären Sie Ihre Anforderungen klarer." return response except Exception as e: error_msg = f"Fehler bei der Verarbeitung durch den Coder-Agent: {str(e)}" logger.error(error_msg) response["content"] = f"## Verarbeitungsfehler\n\n```\n{error_msg}\n```" return response async def _execute_with_auto_correction( self, initial_code: str, requirements: List[str], context: Dict[str, Any], original_prompt: str ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: """ Führt Code mit automatischer Fehlerkorrektur und Wiederholungsversuchen aus. Args: initial_code: Der anfängliche Python-Code requirements: Liste erforderlicher Pakete context: Zusätzlicher Kontext für die Ausführung original_prompt: Die ursprüngliche Benutzeranfrage/Prompt Returns: Tuple aus (endgültiges Ausführungsergebnis, Liste von Versuchsinfo-Dictionarys) """ # Verfolgungs-Daten initialisieren current_code = initial_code current_requirements = requirements.copy() if requirements else [] attempts_info = [] # Mit Korrekturschleife ausführen for attempt in range(1, self.max_correction_attempts + 1): if attempt == 1: logger.info(f"Führe Code aus (Versuch {attempt}/{self.max_correction_attempts})") else: logger.info(f"Führe korrigierten Code aus (Versuch {attempt}/{self.max_correction_attempts})") # Aktuelle Code-Version ausführen result = await self._execute_code(current_code, current_requirements, context) # Versuchsinformationen aufzeichnen attempts_info.append({ "attempt": attempt, "code": current_code, "error": result.get("error", ""), "success": result.get("success", False) }) # Prüfen, ob die Ausführung erfolgreich war if result.get("success", False): # Erfolg! Ergebnis und Versuchsinfo zurückgeben return result, attempts_info # Fehlgeschlagene Ausführung - prüfen, ob die maximale Versuchsgrenze erreicht wurde if attempt >= self.max_correction_attempts: logger.warning(f"Maximale Korrekturversuche ({self.max_correction_attempts}) erreicht, Aufgabe") break # Code basierend auf dem Fehler korrigieren error_message = result.get("error", "Unbekannter Fehler") logger.info(f"Versuche, Code-Fehler zu beheben: {error_message[:200]}...") # Korrigierten Code generieren corrected_code, new_requirements = await self._generate_code_correction( current_code, error_message, original_prompt, current_requirements ) # Für den nächsten Versuch aktualisieren if corrected_code: current_code = corrected_code # Neue Anforderungen hinzufügen if new_requirements: for req in new_requirements: if req not in current_requirements: current_requirements.append(req) logger.info(f"Neue Anforderung hinzugefügt: {req}") else: # Korrektur konnte nicht generiert werden, Schleife beenden logger.warning("Konnte keine Code-Korrektur generieren, Aufgabe") break # Wenn wir hierher gelangen, sind alle Versuche fehlgeschlagen - das letzte Ergebnis und die Versuchsinfo zurückgeben return result, attempts_info async def _generate_code_correction( self, code: str, error_message: str, original_prompt: str, current_requirements: List[str] = None ) -> Tuple[str, List[str]]: """ Generiert eine korrigierte Version des Codes basierend auf Fehlermeldungen. Args: code: Der Code, der Fehler erzeugt hat error_message: Die zu behebende Fehlermeldung original_prompt: Die ursprüngliche Aufgabe/Anforderungen current_requirements: Liste der aktuell erforderlichen Pakete Returns: Tuple aus (korrigierter Code, neue Anforderungsliste) """ try: # Detaillierten Prompt für Code-Korrektur erstellen correction_prompt = f"""Du musst einen Fehler in Python-Code beheben. Der Code wurde für diese Aufgabe geschrieben: ORIGINALE AUFGABE: {original_prompt} AKTUELLER CODE: ```python {code} ``` FEHLERMELDUNG: ``` {error_message} ``` AKTUELLE ANFORDERUNGEN: {', '.join(current_requirements) if current_requirements else "Keine"} Deine Aufgabe ist es, den Fehler zu analysieren und eine korrigierte Version des Codes bereitzustellen. Konzentriere dich speziell auf die Behebung des Fehlers unter Beibehaltung der ursprünglichen Funktionalität. Häufige Korrekturen sind: - Behebung von Syntaxfehlern (fehlende Klammern, Einrückung usw.) - Lösung von Import-Fehlern durch Hinzufügen geeigneter Anforderungen - Korrektur von Dateipfaden oder Behandlung von "Datei nicht gefunden"-Fehlern - Hinzufügen von Fehlerbehandlung für bestimmte Randfälle - Behebung logischer Fehler im Code FORMATIERUNGSHINWEISE: 1. Gib NUR den vollständigen korrigierten Python-Code an OHNE Erklärungen 2. Verwende KEINE Codeblock-Markierungen wie ```python oder ``` 3. Erkläre NICHT, was der Code davor oder danach tut 4. Füge KEINEN Text hinzu, der kein gültiger Python-Code ist 5. Beginne deine Antwort direkt mit dem gültigen Python-Code 6. Beende deine Antwort mit gültigem Python-Code Wenn du neue erforderliche Pakete hinzufügen musst, platziere sie in einem speziell formatierten Kommentar am Anfang deines Codes wie folgt: # REQUIREMENTS: paket1,paket2,paket3 Deine gesamte Antwort muss gültiges Python sein, das ohne Änderungen ausgeführt werden kann. """ # Nachrichten für die API erstellen messages = [ {"role": "system", "content": "Du bist ein Python-Debugging-Experte. Du gibst NUR sauberen, fehlerfreien Python-Code zurück, ohne Erklärungen, Markdown-Formatierung oder Text, der kein Code ist. Deine Antwort sollte nur gültiger, korrigierter Python-Code sein, der direkt ausgeführt werden kann."}, {"role": "user", "content": correction_prompt} ] # API mit sehr niedriger Temperatur für deterministische Korrekturen aufrufen generated_content = await self.ai_service.call_api( messages, temperature=0.1 ) # Den generierten Inhalt bereinigen, um sicherzustellen, dass es sich nur um gültigen Python-Code handelt fixed_code = self._clean_code(generated_content) # Anforderungen aus speziellem Kommentar am Anfang des Codes extrahieren new_requirements = [] for line in fixed_code.split('\n'): if line.strip().startswith("# REQUIREMENTS:"): req_str = line.replace("# REQUIREMENTS:", "").strip() new_requirements = [r.strip() for r in req_str.split(',') if r.strip()] break return fixed_code, new_requirements except Exception as e: logging.error(f"Fehler bei der Generierung der Code-Korrektur: {str(e)}") # None zurückgeben, um Fehler anzuzeigen return None, [] def _clean_code(self, code: str) -> str: """ Bereinigt Code durch Entfernen von Markdown-Codeblock-Markierungen und anderen Formatierungsartefakten. Args: code: Der zu bereinigende Code-String Returns: Bereinigter Code-String """ # Codeblock-Markierungen am Anfang/Ende entfernen code = re.sub(r'^```(?:python)?\s*', '', code) code = re.sub(r'```\s*$', '', code) # Zeilen in umgekehrter Reihenfolge durchgehen, um dem Ende zu beginnen lines = code.split('\n') clean_lines = [] in_trailing_markdown = False for line in reversed(lines): stripped = line.strip() # Prüfen, ob diese Zeile nur Backticks enthält (``` oder ` oder ``) if re.match(r'^`{1,3}$', stripped): in_trailing_markdown = True continue # Wenn wir tatsächlichen Code erreicht haben, keine nachfolgende Markdown-Berücksichtigung mehr if stripped and not in_trailing_markdown: in_trailing_markdown = False # Diese Zeile hinzufügen, wenn sie nicht Teil von nachfolgendem Markdown ist if not in_trailing_markdown: clean_lines.insert(0, line) # Zeilen wieder zusammenfügen clean_code = '\n'.join(clean_lines) # Endgültige Bereinigung für alle restlichen Backticks clean_code = re.sub(r'`{1,3}\s*, ', clean_code) return clean_code.strip() async def _generate_code_from_prompt(self, prompt: str, documents: List[Dict[str, Any]]) -> Tuple[str, List[str]]: """ Generiert Python-Code aus einem Prompt mithilfe des KI-Dienstes. Args: prompt: Der Prompt, aus dem Code generiert wird documents: Mit dem Prompt verbundene Dokumente Returns: Tuple aus (generierter Python-Code, erforderliche Pakete) """ try: # Prompt für die Codegenerierung vorbereiten ai_prompt = f"""Generiere Python-Code, um die folgende Aufgabe zu lösen: {prompt} Verfügbare Eingabedateien: """ # Informationen über verfügbare Dokumente hinzufügen if documents: for i, doc in enumerate(documents): source = doc.get("source", {}) doc_name = source.get("name", f"Dokument {i+1}") doc_type = source.get("content_type", "unbekannt") doc_id = source.get("id", "") ai_prompt += f"- {doc_name} (Typ: {doc_type}, ID: {doc_id})\n" else: ai_prompt += "Keine Eingabedateien verfügbar.\n" ai_prompt += """ WICHTIGE ANFORDERUNGEN: 1. Dein Code MUSS eine 'result'-Variable definieren, um das endgültige Ergebnis zu speichern. 2. Am Ende deines Skripts sollte die result-Variable ausgegeben werden. 3. Mache deine 'result'-Variable zu einem Dictionary oder einer anderen JSON-serialisierbaren Datenstruktur, die alle relevanten Ausgaben enthält. 4. Kommentiere den Code gut, um wichtige Operationen zu erklären. 5. Mache deinen Code vollständig und in sich geschlossen. 6. Füge eine angemessene Fehlerbehandlung hinzu. FORMATIERUNGSANWEISUNGEN: - Gib NUR den Python-Code zurück, OHNE Einleitung, Erklärung oder Abschlusstext - Verwende KEINE Codeblock-Markierungen wie ```python oder ``` - Erkläre NICHT, was der Code davor oder danach tut - Füge KEINEN Text hinzu, der kein gültiger Python-Code ist - Beginne deine Antwort direkt mit gültigem Python-Code - Beende deine Antwort mit gültigem Python-Code Für erforderliche Pakete platziere sie in einem speziell formatierten Kommentar am Anfang deines Codes in einer Zeile wie folgt: # REQUIREMENTS: pandas,numpy,matplotlib,requests Deine gesamte Antwort muss gültiges Python sein, das ohne Änderungen ausgeführt werden kann. """ # Nachrichten für die API erstellen messages = [ {"role": "system", "content": "Du bist ein Python-Codegenerator, der NUR sauberen, ausführbaren Python-Code ohne Erklärungen, Markdown-Formatierung oder Nicht-Code-Text liefert. Deine Antwort sollte ausschließlich aus gültigem Python-Code bestehen, der direkt ausgeführt werden kann."}, {"role": "user", "content": ai_prompt} ] # API aufrufen logging.info(f"KI-API aufrufen, um Code zu generieren") generated_content = await self.ai_service.call_api(messages, temperature=self.ai_temperature) # Den generierten Inhalt bereinigen, um sicherzustellen, dass es sich nur um gültigen Python-Code handelt code = self._clean_code(generated_content) # Anforderungen aus speziellem Kommentar am Anfang des Codes extrahieren requirements = [] for line in code.split('\n'): if line.strip().startswith("# REQUIREMENTS:"): req_str = line.replace("# REQUIREMENTS:", "").strip() requirements = [r.strip() for r in req_str.split(',') if r.strip()] break return code, requirements except Exception as e: logging.error(f"Fehler bei der Generierung von Code mit KI: {str(e)}") # Grundlegenden Fehlerbehandlungscode und keine Anforderungen zurückgeben error_str = str(e).replace('"', '\\"') return f""" # Fehler bei der Codegenerierung print(f"Bei der Codegenerierung ist ein Fehler aufgetreten: {error_str}") # Fehlerergebnis zurückgeben result = {{"error": "Codegenerierung fehlgeschlagen", "message": "{error_str}"}} """, [] async def _execute_code(self, code: str, requirements: List[str] = None, context: Dict[str, Any] = None) -> Dict[str, Any]: """ Führt Python-Code mit dem SimpleCodeExecutor aus. Args: code: Der auszuführende Python-Code requirements: Liste erforderlicher Pakete context: Zusätzlicher Kontext für die Ausführung Returns: Ergebnis der Codeausführung """ # Workflow-ID abrufen und Logging einrichten workflow_id = context.get("workflow_id", "") if context else "" try: # Liste blockierter Pakete für die Sicherheit blocked_packages = [ "cryptography", "flask", "django", "tornado", # Sicherheitsrisiken "tensorflow", "pytorch", "scikit-learn" # Ressourcenintensive Pakete ] # SimpleCodeExecutor mit Anforderungen und workflow_id für Persistenz initialisieren executor = SimpleCodeExecutor( workflow_id=workflow_id, timeout=self.executor_timeout, max_memory_mb=self.executor_memory_limit, requirements=requirements, blocked_packages=blocked_packages, ai_service=self.ai_service ) # Eingabedaten für den Code vorbereiten input_data = {"context": context, "workflow_id": workflow_id} # Code ausführen result = executor.execute_code(code, input_data) # Nicht-persistente Umgebungen bereinigen if not executor.is_persistent: executor.cleanup() return result except Exception as e: error_message = f"Fehler bei der Codeausführung: {str(e)}" logger.error(error_message) return { "success": False, "output": "", "error": error_message, "result": None } def _get_error_recommendation(self, error_message: str) -> str: """Generiert Empfehlungen basierend auf der Fehlermeldung.""" if "ImportError" in error_message or "ModuleNotFoundError" in error_message: return """ Versuchen Sie, Standardbibliotheken oder häufig verwendete Datenanalysemodule zu verwenden. """ elif "PermissionError" in error_message: return """ Der Code hat nicht die notwendigen Berechtigungen, um auf Dateien oder Verzeichnisse zuzugreifen. """ elif "SyntaxError" in error_message: return """ Es gibt einen Syntaxfehler im Code. Überprüfen Sie auf fehlende Klammern, Anführungszeichen, Doppelpunkte oder Einrückungsfehler. """ elif "FileNotFoundError" in error_message: return """ Eine Datei konnte nicht gefunden werden. Überprüfen Sie den Dateipfad und stellen Sie sicher, dass die Datei existiert. """ else: return """ Um den Fehler zu beheben: 1. Überprüfen Sie die genaue Fehlermeldung 2. Vereinfachen Sie den Code und testen Sie schrittweise 3. Verwenden Sie try/except-Blöcke für fehleranfällige Operationen """ class SimpleCodeExecutor: """ Ein vereinfachter Executor, der Python-Code in isolierten virtuellen Umgebungen ausführt. """ # Klassenvariable zum Speichern von Workflow-Umgebungen für die Persistenz _workflow_environments = {} def __init__(self, workflow_id: str = None, timeout: int = 30, max_memory_mb: int = 512, requirements: List[str] = None, blocked_packages: List[str] = None, ai_service = None): """ Initialisiert den SimpleCodeExecutor. Args: workflow_id: Optionale Workflow-ID für persistente Umgebungen timeout: Maximale Ausführungszeit in Sekunden max_memory_mb: Maximaler Speicher in MB requirements: Liste der zu installierenden Pakete blocked_packages: Liste blockierter Pakete """ self.workflow_id = workflow_id self.timeout = timeout self.max_memory_mb = max_memory_mb self.temp_dir = None self.requirements = requirements or [] self.blocked_packages = blocked_packages or [ "cryptography", "flask", "django", "tornado", # Sicherheitsrisiken "tensorflow", "pytorch", "scikit-learn" # Ressourcenintensive Pakete ] self.is_persistent = workflow_id is not None self.ai_service = ai_service def _create_venv(self) -> str: """Erstellt eine virtuelle Umgebung und gibt den Pfad zurück.""" # Prüfen auf bestehende Umgebung bei Verwendung von workflow_id if self.workflow_id: self.is_persistent = True existing_env = self._workflow_environments.get(self.workflow_id) if existing_env and os.path.exists(existing_env): logger.info(f"Wiederverwendung bestehender virtueller Umgebung: {existing_env}") self.temp_dir = os.path.dirname(existing_env) return existing_env # Neue Umgebung erstellen venv_parent_dir = tempfile.mkdtemp(prefix="code_exec_") self.temp_dir = venv_parent_dir venv_path = os.path.join(venv_parent_dir, "venv") try: # Virtuelle Umgebung erstellen subprocess.run([sys.executable, "-m", "venv", venv_path], check=True, capture_output=True) # Umgebungspfad speichern, wenn für einen bestimmten Workflow if self.workflow_id: self._workflow_environments[self.workflow_id] = venv_path return venv_path except subprocess.CalledProcessError as e: logger.error(f"Fehler beim Erstellen der virtuellen Umgebung: {e}") raise RuntimeError(f"Virtuelle Umgebung konnte nicht erstellt werden: {e}") def _get_python_executable(self, venv_path: str) -> str: """Gibt den Pfad zum Python-Executable in der virtuellen Umgebung zurück.""" if os.name == 'nt': # Windows return os.path.join(venv_path, "Scripts", "python.exe") else: # Unix/Linux return os.path.join(venv_path, "bin", "python") def _extract_required_packages(self, code: str) -> List[str]: """Extrahiert erforderliche Pakete aus REQUIREMENTS-Kommentaren in der ersten Codezeile""" packages = set() # Nach speziellem REQUIREMENTS-Kommentar suchen first_lines = code.split('\n')[:5] # Nur die ersten Zeilen prüfen for line in first_lines: if line.strip().startswith("# REQUIREMENTS:"): req_str = line.replace("# REQUIREMENTS:", "").strip() for pkg in req_str.split(','): if pkg.strip(): packages.add(pkg.strip()) return list(packages) def execute_code(self, code: str, input_data: Dict[str, Any] = None) -> Dict[str, Any]: """ Führt Python-Code in einer isolierten Umgebung aus. Args: code: Auszuführender Python-Code input_data: Optionale Eingabedaten für den Code Returns: Dictionary mit Ausführungsergebnissen """ logger.info(f"Führe Code mit workflow_id aus: {self.workflow_id}") # Virtuelle Umgebung erstellen oder wiederverwenden venv_path = self._create_venv() # Datei für den Code erstellen code_id = uuid.uuid4().hex[:8] code_file = os.path.join(self.temp_dir, f"code_{code_id}.py") # Code ohne zusätzlichen Loader-Code schreiben with open(code_file, "w", encoding="utf-8") as f: f.write(code) # Python-Executable holen python_executable = self._get_python_executable(venv_path) logger.info(f"Verwende Python-Executable: {python_executable}") # Code ausführen try: # Code aus Root-Verzeichnis ausführen working_dir = os.path.dirname(code_file) process = subprocess.run( [python_executable, code_file], timeout=self.timeout, capture_output=True, text=True, cwd=working_dir ) # Ausgabe verarbeiten stdout = process.stdout stderr = process.stderr # Ergebnis aus stdout holen, falls verfügbar result_data = None if process.returncode == 0 and stdout: try: # Nach der letzten Zeile suchen, die JSON sein könnte for line in reversed(stdout.strip().split('\n')): line = line.strip() if line and line[0] in '{[' and line[-1] in '}]': try: result_data = json.loads(line) # Erfolgreich geparste JSON-Ergebnis verwenden break except json.JSONDecodeError: # Kein gültiges JSON, mit nächster Zeile fortfahren continue except Exception as e: logger.warning(f"Fehler beim Parsen des Ergebnisses aus stdout: {str(e)}") # Ergebnisdictionary erstellen execution_result = { "success": process.returncode == 0, "output": stdout, "error": stderr if process.returncode != 0 else "", "result": result_data, "exit_code": process.returncode } except subprocess.TimeoutExpired: logger.error(f"Ausführung nach {self.timeout} Sekunden abgelaufen") execution_result = { "success": False, "output": "", "error": f"Ausführung abgelaufen (Timeout nach {self.timeout} Sekunden)", "result": None, "exit_code": -1 } except Exception as e: logger.error(f"Ausführungsfehler: {str(e)}") execution_result = { "success": False, "output": "", "error": f"Ausführungsfehler: {str(e)}", "result": None, "exit_code": -1 } # Temporäre Code-Datei bereinigen try: if os.path.exists(code_file): os.remove(code_file) except Exception as e: logger.warning(f"Fehler beim Bereinigen der temporären Code-Datei: {e}") return execution_result def cleanup(self): """Temporäre Ressourcen bereinigen.""" # Bereinigung für persistente Umgebungen überspringen if self.is_persistent and self.workflow_id: logger.info(f"Überspringe Bereinigung für persistente Umgebung von Workflow {self.workflow_id}") return # Temporäres Verzeichnis bereinigen if self.temp_dir and os.path.exists(self.temp_dir): try: shutil.rmtree(self.temp_dir) logger.info(f"Temporäres Verzeichnis gelöscht: {self.temp_dir}") except Exception as e: logger.warning(f"Temporäres Verzeichnis {self.temp_dir} konnte nicht gelöscht werden: {e}") def __del__(self): """Bereinigung während der Garbage Collection.""" self.cleanup() # Singleton-Instanz _coder_agent = None def get_coder_agent(): """Gibt eine Singleton-Instanz des Coder-Agenten zurück""" global _coder_agent if _coder_agent is None: _coder_agent = AgentCoder() return _coder_agent