""" CoderAgent - A unified agent for developing and executing Python code. Includes code execution capabilities previously in separate modules. """ import logging import json import re import uuid import traceback import os import subprocess import tempfile import shutil import sys import pandas as pd from datetime import datetime from typing import List, Dict, Any, Optional, Tuple from modules.agentservice_base import BaseAgent from modules.agentservice_utils import FileUtils, WorkflowUtils, MessageUtils, LoggingUtils from connectors.connector_aichat_openai import ChatService from modules.agentservice_protocol import AgentMessage, AgentCommunicationProtocol logger = logging.getLogger(__name__) class SimpleCodeExecutor: """ A simplified executor that runs Python code in isolated virtual environments. """ # Class variable to store workflow environments for persistence _workflow_environments = {} def __init__(self, workflow_id: str = None, timeout: int = 30, max_memory_mb: int = 512, requirements: List[str] = None, blocked_packages: List[str] = None): """ Initialize the SimpleCodeExecutor. Args: workflow_id: Optional workflow ID for persistent environments timeout: Maximum execution time in seconds max_memory_mb: Maximum memory in MB requirements: List of packages to install blocked_packages: List of blocked packages """ self.workflow_id = workflow_id self.timeout = timeout self.max_memory_mb = max_memory_mb self.temp_dir = None self.requirements = requirements or [] self.blocked_packages = blocked_packages or [ "cryptography", "flask", "django", "tornado", # Security risks "tensorflow", "pytorch", "scikit-learn" # Resource intensive ] self.is_persistent = workflow_id is not None @classmethod def get_workflow_environment(cls, workflow_id: str) -> Optional[str]: """Get an existing workflow environment path if it exists.""" return cls._workflow_environments.get(workflow_id) @classmethod def set_workflow_environment(cls, workflow_id: str, env_path: str) -> None: """Store a workflow environment path.""" cls._workflow_environments[workflow_id] = env_path def _create_venv(self) -> str: """Creates a virtual environment and returns the path.""" # Check for existing environment if using workflow_id if self.workflow_id: self.is_persistent = True existing_env = self.get_workflow_environment(self.workflow_id) if existing_env and os.path.exists(existing_env): logger.info(f"Reusing existing virtual environment: {existing_env}") self.temp_dir = os.path.dirname(existing_env) return existing_env else: logger.info(f"Creating new environment for workflow {self.workflow_id}") # Create a new environment venv_parent_dir = tempfile.mkdtemp(prefix="simple_exec_") self.temp_dir = venv_parent_dir venv_path = os.path.join(venv_parent_dir, "venv") try: # Create virtual environment logger.info(f"Creating new virtual environment in {venv_path}") subprocess.run([sys.executable, "-m", "venv", venv_path], check=True, capture_output=True) # Store the environment path if this is for a specific workflow if self.workflow_id: logger.info(f"Registering new persistent environment for workflow {self.workflow_id}") self.set_workflow_environment(self.workflow_id, venv_path) return venv_path except subprocess.CalledProcessError as e: logger.error(f"Error creating virtual environment: {e}") raise RuntimeError(f"Could not create venv: {e}") def _get_pip_executable(self, venv_path: str) -> str: """Gets the path to the pip executable in the virtual environment.""" if os.name == 'nt': # Windows return os.path.join(venv_path, "Scripts", "pip.exe") else: # Unix/Linux return os.path.join(venv_path, "bin", "pip") def _get_python_executable(self, venv_path: str) -> str: """Gets the path to the Python executable in the virtual environment.""" if os.name == 'nt': # Windows return os.path.join(venv_path, "Scripts", "python.exe") else: # Unix/Linux return os.path.join(venv_path, "bin", "python") def _filter_requirements(self, requirements: List[str]) -> List[str]: """Filter out blocked packages and invalid requirements.""" if not requirements: return [] filtered_requirements = [] for req in requirements: # Skip empty, comment lines, or invalid requirements req = req.strip() if not req or req.startswith('#') or '```' in req or req in ['`', '``', '```']: logging.warning(f"Skipping comment or invalid requirement: {req}") continue # Extract package name from requirement spec import re package_name = re.split(r'[=<>]', req)[0].strip().lower() if package_name in self.blocked_packages: logging.warning(f"Blocked package detected: {package_name}") continue filtered_requirements.append(req) return filtered_requirements def _install_packages(self, venv_path: str, requirements: List[str]) -> bool: """Install packages in the virtual environment.""" if not requirements: return True # Filter requirements filtered_requirements = self._filter_requirements(requirements) if not filtered_requirements: logger.info("No allowed packages to install") return True # Get pip executable pip_executable = self._get_pip_executable(venv_path) # Install packages try: logger.info(f"Installing packages: {', '.join(filtered_requirements)}") result = subprocess.run( [pip_executable, "install"] + filtered_requirements, check=True, capture_output=True, text=True, timeout=300 ) logger.info("Package installation successful") return True except subprocess.CalledProcessError as e: logger.error(f"Error during package installation: {e.stderr}") return False except Exception as e: logger.error(f"Error during package installation: {str(e)}") return False def _extract_required_packages(self, code: str) -> List[str]: # Extract required packages from requirements comments in the 1st code line packages = set() # Check for special REQUIREMENTS comment - specific format we're looking for first_lines = code.split('\n')[:5] # Only check first few lines for line in first_lines: if line.strip().startswith("# REQUIREMENTS:"): req_str = line.replace("# REQUIREMENTS:", "").strip() for pkg in req_str.split(','): if pkg.strip(): packages.add(pkg.strip()) return list(packages) def execute_code(self, code: str, input_data: Dict[str, Any] = None) -> Dict[str, Any]: """ Execute Python code in an isolated environment using a simple approach. Args: code: Python code to execute input_data: Optional input data for the code Returns: Dictionary with execution results """ logger.info(f"Executing code with workflow_id: {self.workflow_id}") # Create or reuse virtual environment venv_path = self._create_venv() #creating self.temp_dir! # Create input_data directory for file handling input_data_dir = os.path.join(self.temp_dir, "input_data") # Temp dir is at root os.makedirs(input_data_dir, exist_ok=True) # Extract and install required packages all_requirements = [] # Add explicitly provided requirements # if self.requirements: # all_requirements.extend(self.requirements) # Extract requirements from code extracted_requirements = self._extract_required_packages(code) if extracted_requirements: all_requirements.extend(extracted_requirements) logger.info(f"Extracted required packages from code: {', '.join(extracted_requirements)}") # Install packages if needed if all_requirements: logger.info(f"Installing {len(all_requirements)} packages") install_success = self._install_packages(venv_path, all_requirements) if not install_success: # Return error if package installation failed return { "success": False, "output": "", "error": f"Failed to install required packages: {', '.join(all_requirements)}", "result": None, "exit_code": -1 } # Process extracted document content if available if input_data and "extracted_documents" in input_data: for doc in input_data["extracted_documents"]: doc_name = doc["name"] doc_content = doc["content"] doc_type = doc["type"] # Create file path file_path = os.path.join(input_data_dir, doc_name) try: # Write content to file with open(file_path, 'w', encoding='utf-8') as f: f.write(doc_content) # Add to files list if not already there if "files" not in input_data: input_data["files"] = [] input_data["files"].append({ "id": f"extracted_{doc_name}", "name": doc_name, "type": doc_type, "path": file_path }) logger.info(f"Created file from extracted content: {doc_name}") except Exception as e: logger.error(f"Error creating file from extracted content: {str(e)}") # Copy input files to input_data directory if provided if input_data and "files" in input_data: for file_info in input_data.get("files", []): # Skip files we just created from extracted content if file_info.get("id", "").startswith("extracted_"): continue source_path = file_info.get("path", "") logger.info(f"Attempting to copy file from: {source_path}") logger.info(f"File exists: {os.path.exists(source_path)}") if source_path and os.path.exists(source_path): # Get just the filename file_name = os.path.basename(source_path) # Create destination path in input_data directory dest_path = os.path.join(input_data_dir, file_name) try: # Copy the file shutil.copy2(source_path, dest_path) logger.info(f"Copied file to input_data directory: {dest_path}") except Exception as e: logger.error(f"Error copying file {source_path}: {str(e)}") # Create a file for the code code_id = uuid.uuid4().hex[:8] code_file = os.path.join(self.temp_dir, f"code_{code_id}.py") # Write the code as-is without injecting additional loader code with open(code_file, "w", encoding="utf-8") as f: f.write(code) # Get Python executable python_executable = self._get_python_executable(venv_path) logger.info(f"Using Python executable: {python_executable}") # Execute code try: # Run the code from root dir working_dir = os.path.dirname(code_file) # This should be the project root logger.info(f"DEBUG PATH Root: {os.getcwd()} Code: {code_file} Working Dir: {working_dir}") process = subprocess.run( [python_executable, code_file], timeout=self.timeout, capture_output=True, text=True, cwd=self.temp_dir ) # Process the output stdout = process.stdout stderr = process.stderr # Get result from stdout if available result_data = None if process.returncode == 0 and stdout: try: # Look for the last line that could be JSON for line in reversed(stdout.strip().split('\n')): line = line.strip() if line and line[0] in '{[' and line[-1] in '}]': try: result_data = json.loads(line) # Successfully parsed JSON result, use it break except json.JSONDecodeError: # Not valid JSON, continue to next line continue except Exception as e: logger.warning(f"Failed to parse result from stdout: {str(e)}") # Create result dictionary execution_result = { "success": process.returncode == 0, "output": stdout, "error": stderr if process.returncode != 0 else "", "result": result_data, "exit_code": process.returncode } except subprocess.TimeoutExpired: logger.error(f"Execution timed out after {self.timeout} seconds") execution_result = { "success": False, "output": "", "error": f"Execution timed out (timeout after {self.timeout} seconds)", "result": None, "exit_code": -1 } except Exception as e: logger.error(f"Execution error: {str(e)}") execution_result = { "success": False, "output": "", "error": f"Execution error: {str(e)} for code {code}", "result": None, "exit_code": -1 } # Clean up temporary code file try: if os.path.exists(code_file): os.remove(code_file) except Exception as e: logger.warning(f"Error cleaning up temporary code file: {e}") return execution_result def cleanup(self): """Clean up temporary resources.""" # Skip cleanup for persistent environments if self.is_persistent and self.workflow_id: logger.info(f"Skipping cleanup for persistent environment of workflow {self.workflow_id}") return # Clean up temporary directory if self.temp_dir and os.path.exists(self.temp_dir): try: shutil.rmtree(self.temp_dir) logger.info(f"Deleted temporary directory: {self.temp_dir}") except Exception as e: logger.warning(f"Could not delete temporary directory {self.temp_dir}: {e}") def __del__(self): """Clean up during garbage collection.""" self.cleanup() def get_error_recommendation(error_message: str) -> str: """Generate recommendations based on error message.""" if "ImportError" in error_message or "ModuleNotFoundError" in error_message: return """ ### Recommendation The error indicates a missing Python module. Try using standard libraries or common data analysis modules. """ elif "PermissionError" in error_message: return """ ### Recommendation The code doesn't have the necessary permissions to access files or directories. """ elif "SyntaxError" in error_message: return """ ### Recommendation There's a syntax error in the code. Check for missing parentheses, quotes, colons, or indentation errors. """ elif "FileNotFoundError" in error_message: return """ ### Recommendation A file could not be found. Check the file path and make sure the file exists. """ else: return """ ### Recommendation To fix the error: 1. Check the exact error message 2. Simplify the code and test step by step 3. Use try/except blocks for error-prone operations """ class CoderAgent(BaseAgent): """Agent for developing and executing Python code""" def __init__(self): """Initialize the coder agent with proper type and capabilities""" super().__init__() # Agent metadata self.id = "coder" self.type = "coder" self.name = "Python Code Agent" self.description = "Develops and executes Python code" self.capabilities = "code_development,data_processing,file_processing,automation" self.result_format = "python_code" # Add document capabilities self.supports_documents = True self.document_capabilities = ["read", "reference", "create"] self.required_context = ["workflow_id"] # Initialize protocol self.protocol = AgentCommunicationProtocol() # Init utilities self.file_utils = FileUtils() self.message_utils = MessageUtils() # Executor settings self.executor_timeout = 60 # seconds self.executor_memory_limit = 512 # MB # AI service settings self.ai_temperature = 0.2 # Lower temperature for more deterministic code generation self.ai_max_tokens = 2000 # Enough tokens for complex code def get_agent_info(self) -> Dict[str, Any]: """Get agent information for agent registry""" info = super().get_agent_info() info.update({ "metadata": { "timeout": self.executor_timeout, "memory_limit": self.executor_memory_limit } }) return info async def process_message(self, message: Dict[str, Any], context: Dict[str, Any] = None) -> Dict[str, Any]: """ Process a message to develop and execute Python code. Args: message: The message to process context: Additional context information Returns: Response message """ # Extract workflow_id from context or message workflow_id = context.get("workflow_id") if context else message.get("workflow_id", "unknown") # Get or create logging_utils log_func = context.get("log_func") if context else None logging_utils = LoggingUtils(workflow_id, log_func) # Create response message response = { "role": "assistant", "content": "", "agent_id": self.id, "agent_type": self.type, "agent_name": self.name, "workflow_id": workflow_id, "documents": [] } try: # Extract content and documents content = message.get("content", "") documents = message.get("documents", []) # Extract code from message content code_blocks = re.findall(r'```(?:python)?\s*([\s\S]*?)```', content) code_to_execute = None requirements = [] if code_blocks: # Use the first code block found code_to_execute = code_blocks[0] # Clean the code to remove any markdown formatting code_to_execute = self._clean_code(code_to_execute) logging_utils.info(f"Code extracted from message ({len(code_to_execute)} characters)", "agents") else: # Generate code based on the message content using AI logging_utils.info("No code found in message, generating new code with AI", "agents") # Generate code using AI code_to_execute, requirements = await self._generate_code_from_prompt(content, documents) if not code_to_execute: logging_utils.warning("AI could not generate code", "agents") response["content"] = "I couldn't generate executable code based on your request. Please provide more detailed instructions." self.message_utils.finalize_message(response) return response logging_utils.info(f"Code generated with AI ({len(code_to_execute)} characters)", "agents") # Execute the code if code_to_execute: logging_utils.info("Executing code", "execution") # Prepare execution context execution_context = { "workflow_id": workflow_id, "documents": documents, "message": message, "log_func": log_func } # Send a status update if log_func: status_message = self.protocol.create_status_update_message( status_description="Processing code execution request", sender_id=self.id, status="in_progress", progress=0.5, context_id=workflow_id ) log_func(workflow_id, status_message.content, "info", self.id, self.name) # Execute code result = await self._execute_code(code_to_execute, requirements, execution_context) # Prepare response if result.get("success", False): print("DEBUG-RESULT:",result,"#END\n") # Code execution successful output = result.get("output", "") execution_result = result.get("result") logging_utils.info("Code executed successfully", "execution") # Format response content response_content = f"## Code executed successfully\n\n" # Include the executed code response_content += f"### Executed Code\n\n```python\n{code_to_execute}\n```\n\n" # Include the output if available if output: response_content += f"### Output\n\n```\n{output}\n```\n\n" # Create document data_document = self.create_document_from_result(execution_result) response["documents"].append(data_document) print("DEBUG DOC:",data_document) # Include the execution result if available if execution_result: result_str = json.dumps(execution_result, indent=2) if isinstance(execution_result, (dict, list)) else str(execution_result) response_content += f"### Result\n\n```\n{result_str}\n```\n\n" response["content"] = response_content # Process any files created by the code if isinstance(execution_result, dict) and "created_files" in execution_result: created_files = execution_result.get("created_files", []) for file_info in created_files: file_id = file_info.get("id") if file_id: logging_utils.info(f"Adding created file {file_info.get('name', file_id)} to documents", "files") # Add file document to the response doc = { "id": f"doc_{uuid.uuid4()}", "source": file_info, "type": "file" } response["documents"].append(doc) else: # Code execution failed error = result.get("error", "Unknown error") logging_utils.error(f"Error during code execution: {error}", "execution") # Format error response response_content = f"## Error during code execution\n\n" response_content += f"### Executed Code\n\n```python\n{code_to_execute}\n```\n\n" response_content += f"### Error\n\n```\n{error}\n```\n\n" # Add recommendation based on error response_content += get_error_recommendation(error) response["content"] = response_content else: # No code to execute response["content"] = "I couldn't find or generate executable code. Please provide Python code or explain your requirements more clearly." # Finalize response self.message_utils.finalize_message(response) # Log success logging_utils.info("CoderAgent has successfully processed the request", "agents") print("DEBUG CODE-RESPONSE:",response) return response except Exception as e: error_msg = f"Error during processing by the CoderAgent: {str(e)}" logging_utils.error(error_msg, "error") # Create error response response["content"] = f"## Processing Error\n\n```\n{error_msg}\n\n{traceback.format_exc()}\n```" self.message_utils.finalize_message(response) return response def create_document_from_result(execution_result, output_format="json"): """ Create a document object from execution results Args: execution_result: The data returned from code execution output_format: Desired format (json, csv, etc.) Returns: Document object for passing to other agents """ doc_id = f"data_{uuid.uuid4()}" # Determine filename and content type based on the data if isinstance(execution_result, pd.DataFrame): # Handle DataFrame result filename = "processed_data.csv" content_type = "text/csv" content = execution_result.to_csv(index=False) elif isinstance(execution_result, dict) or isinstance(execution_result, list): # Handle dictionary or list result filename = "processed_data.json" content_type = "application/json" content = json.dumps(execution_result) elif isinstance(execution_result, str): # Try to determine if string is JSON, CSV, or plain text if execution_result.strip().startswith('{') or execution_result.strip().startswith('['): filename = "processed_data.json" content_type = "application/json" elif ',' in execution_result and '\n' in execution_result: filename = "processed_data.csv" content_type = "text/csv" else: filename = "processed_data.txt" content_type = "text/plain" content = str(execution_result) else: # Default case for other types filename = "processed_data.txt" content_type = "text/plain" content = str(execution_result) # Create document object document = { "id": doc_id, "source": { "type": "generated", "id": doc_id, "name": filename, "content_type": content_type, }, "contents": [{ "type": "text", "text": content, "is_extracted": True }] } return document def _clean_code(self, code: str) -> str: """ Clean up code by removing markdown code block markers and other formatting artifacts. Args: code: The code string to clean Returns: Cleaned code string """ import re # Remove code block markers at beginning/end code = re.sub(r'^```(?:python)?\s*', '', code) code = re.sub(r'```\s*$', '', code) # Remove any trailing markdown code blocks that might have been added by the AI lines = code.split('\n') clean_lines = [] # Flag to track if we're in a trailing markdown section in_trailing_markdown = False for line in reversed(lines): stripped = line.strip() # Check if this line contains only backticks (``` or ` or ``) if re.match(r'^`{1,3}$', stripped): in_trailing_markdown = True continue # Check if this is a markdown comment or note if in_trailing_markdown and (stripped.startswith('#') or stripped.lower().startswith('note:') or stripped.lower().startswith('example:')): continue # If we've reached actual code, stop considering trailing markdown if stripped and not in_trailing_markdown: in_trailing_markdown = False # Add this line if it's not part of trailing markdown if not in_trailing_markdown: clean_lines.insert(0, line) # Join the lines back together clean_code = '\n'.join(clean_lines) # Final cleanup for any stray backticks clean_code = re.sub(r'`{1,3}\s*$', '', clean_code) return clean_code.strip() async def _generate_code_from_prompt(self, prompt: str, documents: List[Dict[str, Any]]) -> Tuple[str, List[str]]: """ Generate Python code from a prompt using AI service. Args: prompt: The prompt to generate code from documents: Documents associated with the prompt Returns: Tuple of (generated Python code, required packages) """ try: # Initialize AI service chat_service = ChatService() # Prepare a prompt for code generation ai_prompt = f"""Generate Python code to solve the following task: {prompt} Available input files: """ # Add information about available documents if documents: for i, doc in enumerate(documents): source = doc.get("source", {}) doc_name = source.get("name", f"Document {i+1}") doc_type = source.get("content_type", "unknown") doc_id = source.get("id", "") ai_prompt += f"- {doc_name} (type: {doc_type}, id: {doc_id}, path: './input_data/{doc_name}')\n" else: ai_prompt += "No input files available.\n" ai_prompt += """ IMPORTANT REQUIREMENTS: 1. Your code MUST define a 'result' variable to store the final output of your code. 2. At the end of your script, it should print or output the result variable. 3. Make your 'result' variable a dictionary or another JSON-serializable data structure that contains all relevant output. 4. Input files are accessible in the './input_data/' directory. 5. Keep code well-documented with comments explaining key operations. 6. Make your code complete and self-contained. 7. Include proper error handling. FORMAT INSTRUCTIONS: - Provide ONLY the Python code without ANY introduction, explanation, or conclusion text - DO NOT include code block markers like ```python or ``` - DO NOT explain what the code does before or after it - DO NOT include any text that is not valid Python code - Start your response directly with valid Python code - End your response with valid Python code For required packages, place them in a specially formatted comment at the top of your code one one line like this: # REQUIREMENTS: pandas,numpy,matplotlib,requests Your entire response must be valid Python that can be executed without modification. """ # Create messages for the API messages = [ {"role": "system", "content": "You are a Python code generator that provides ONLY clean, executable Python code without any explanations, markdown formatting, or non-code text. Your response should be nothing but valid Python code that can be executed directly."}, {"role": "user", "content": ai_prompt} ] # Call the API logging.info(f"Calling AI API to generate code") generated_content = await chat_service.call_api(messages, temperature=self.ai_temperature, max_tokens=self.ai_max_tokens) # Clean the generated content to ensure it's only valid Python code code = self._clean_code(generated_content) # Extract requirements from special comment at the top of the code requirements = [] for line in code.split('\n'): if line.strip().startswith("# REQUIREMENTS:"): req_str = line.replace("# REQUIREMENTS:", "").strip() requirements = [r.strip() for r in req_str.split(',') if r.strip()] break return code, requirements except Exception as e: logging.error(f"Error generating code with AI: {str(e)}", exc_info=True) # Return basic error handling code and no requirements error_str = str(e).replace('"', '\\"') return f""" # Error during code generation print(f"An error occurred during code generation: {error_str}") # Return an error result result = {{"error": "Code generation failed", "message": "{error_str}"}} """, [] async def _execute_code(self, code: str, requirements: List[str] = None, context: Dict[str, Any] = None) -> Dict[str, Any]: """ Execute Python code using the SimpleCodeExecutor. Args: code: The Python code to execute requirements: List of required packages context: Additional context for execution Returns: Result of code execution """ # Get workflow ID and set up logging workflow_id = context.get("workflow_id", "") if context else "" logging_utils = None if "log_func" in context and workflow_id: logging_utils = LoggingUtils(workflow_id, context.get("log_func")) if logging_utils: logging_utils.info("Executing Python code", "execution") if requirements: logging_utils.info(f"Required packages: {', '.join(requirements)}", "execution") try: # List of blocked packages for security blocked_packages = [ "cryptography", "flask", "django", "tornado", # Security risks "tensorflow", "pytorch", "scikit-learn" # Resource intensive ] # Initialize SimpleCodeExecutor with requirements and workflow_id for persistence executor = SimpleCodeExecutor( workflow_id=workflow_id, timeout=self.executor_timeout, max_memory_mb=self.executor_memory_limit, requirements=requirements, blocked_packages=blocked_packages ) # Prepare input data for the code input_data = {"context": context, "workflow_id": workflow_id} # Add file references if available if context and "documents" in context: input_data["files"] = [ { "id": doc.get("source", {}).get("id", ""), "name": doc.get("source", {}).get("name", ""), "type": doc.get("source", {}).get("content_type", ""), "path": doc.get("source", {}).get("path", "") # Full file path } for doc in context.get("documents", []) if doc.get("source", {}).get("type") == "file" ] # Extract document content from message but don't create files yet if context and "message" in context and "content" in context["message"]: message_content = context["message"]["content"] # Check if there's extracted document content if "=== EXTRACTED DOCUMENT CONTENT ===" in message_content: # Add a special field to input_data for extracted content input_data["extracted_documents"] = [] # Split by the document marker pattern pattern = r"--- (.*?) ---\s*" import re doc_sections = re.split(pattern, message_content) # Skip the first section (before any "--- doc ---" marker) for i in range(1, len(doc_sections), 2): if i+1 < len(doc_sections): doc_name = doc_sections[i].strip() doc_content = doc_sections[i+1].strip() # Store the extracted content to be processed by the executor input_data["extracted_documents"].append({ "name": doc_name, "content": doc_content, "type": "text/csv" if doc_name.endswith(".csv") else "text/plain" }) if logging_utils: logging_utils.info(f"Extracted document content: {doc_name}", "execution") # Execute the code if logging_utils: logging_utils.info(f"Executing code with input data containing {len(input_data.get('files', []))} files", "execution") result = executor.execute_code(code, input_data) # Log the execution results if logging_utils: if result.get("success", False): logging_utils.info("Code executed successfully", "execution") # Log a preview of the output output = result.get("output", "") if output: preview = output[:1000] + "..." if len(output) > 1000 else output logging_utils.info(f"Output preview: {preview}", "execution") # Log a preview of the result execution_result = result.get("result") if execution_result: if isinstance(execution_result, (dict, list)): result_str = json.dumps(execution_result, indent=2) preview = result_str[:1000] + "..." if len(result_str) > 1000 else result_str else: str_result = str(execution_result) preview = str_result[:1000] + "..." if len(str_result) > 1000 else str_result logging_utils.info(f"Result preview: {preview}", "execution") else: # Log error information error = result.get("error", "Unknown error") logging_utils.error(f"Error during code execution: {error}", "execution") print("DEBUG CODE-ERROR:",code,"#END") # Clean up non-persistent environments if not executor.is_persistent: executor.cleanup() return result except Exception as e: error_message = f"Error during code execution: {str(e)}\n{traceback.format_exc()}" if logging_utils: logging_utils.error(error_message, "error") return { "success": False, "output": "", "error": error_message, "result": None } def send_error_message(self, error_description: str, sender_id: str, receiver_id: str = None, context_id: str = None) -> AgentMessage: """Send an error message using the protocol""" return self.protocol.create_error_message( error_description=error_description, sender_id=sender_id, receiver_id=receiver_id, error_type="code_execution", context_id=context_id ) def send_result_message(self, result_content: str, sender_id: str, receiver_id: str, task_id: str, output_data: Dict[str, Any] = None, context_id: str = None) -> AgentMessage: """Send a result message using the protocol""" return self.protocol.create_result_message( result_content=result_content, sender_id=sender_id, receiver_id=receiver_id, task_id=task_id, output_data=output_data, result_format="python_code", context_id=context_id ) # Singleton instance _coder_agent = None def get_coder_agent(): """Returns a singleton instance of the Coder Agent""" global _coder_agent if _coder_agent is None: _coder_agent = CoderAgent() return _coder_agent