gateway/modules/chat_agent_coder.py
2025-04-20 23:53:37 +02:00

810 lines
No EOL
30 KiB
Python

"""
Coder agent for development and execution of Python code.
Optimized for the new task-based processing.
"""
import logging
import json
import re
import uuid
import os
import subprocess
import tempfile
import shutil
import sys
from typing import Dict, Any, List, Optional, Tuple
from modules.chat_registry import AgentBase
logger = logging.getLogger(__name__)
class AgentCoder(AgentBase):
"""Agent for development and execution of Python code"""
def __init__(self):
"""Initialize the coder agent"""
super().__init__()
self.name = "coder"
self.description = "Develops and executes Python code for data processing and automation"
self.capabilities = [
"code_development",
"data_processing",
"file_processing",
"automation",
"code_execution"
]
# Executor settings
self.executor_timeout = 60 # seconds
self.executor_memory_limit = 512 # MB
# AI service settings
self.ai_temperature = 0.1 # Lower temperature for deterministic code generation
# Auto-correction settings
self.max_correction_attempts = 3 # Maximum number of correction attempts
async def process_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""
Process a standardized task structure and perform code development/execution.
Args:
task: A dictionary containing:
- task_id: Unique ID for this task
- prompt: The main instruction for the agent
- input_documents: List of documents to process
- output_specifications: List of required output documents
- context: Additional contextual information
Returns:
A dictionary containing:
- feedback: Text response explaining the code execution
- documents: List of created document objects
"""
try:
# Extract relevant task information
prompt = task.get("prompt", "")
input_documents = task.get("input_documents", [])
output_specs = task.get("output_specifications", [])
context_info = task.get("context", {})
# Check if AI service is available
if not self.ai_service:
logger.error("No AI service configured for the Coder agent")
return {
"feedback": "The Coder agent is not properly configured.",
"documents": []
}
# Extract context from input documents
document_context = self._extract_document_context(input_documents)
# Generate code based on the prompt and document context
logger.info("Generating code based on the task")
code_to_execute, requirements = await self._generate_code_from_prompt(prompt, document_context)
if not code_to_execute:
logger.warning("AI couldn't generate any code")
return {
"feedback": "I couldn't generate executable code based on the task. Please provide more detailed instructions.",
"documents": []
}
logger.info(f"Code generated with AI ({len(code_to_execute)} characters)")
# Collect created documents
generated_documents = []
# Add code as first document
code_doc = {
"label": "generated_code.py",
"content": code_to_execute
}
generated_documents.append(code_doc)
# Execute code with auto-correction loop
execution_context = {
"input_documents": input_documents,
"task": task
}
# Enhanced execution with auto-correction
result, attempts_info = await self._execute_with_auto_correction(
code_to_execute,
requirements,
execution_context,
prompt # Original prompt/message
)
# Create output documents based on execution result and output specifications
if result.get("success", False):
# Code execution successful
output = result.get("output", "")
execution_result = result.get("result")
logger.info("Code executed successfully")
# Determine output type of the result
result_docs = self._generate_result_documents(
attempts_info[-1]["code"], # Last successful code
output,
execution_result,
output_specs
)
# Add result documents
generated_documents.extend(result_docs)
# Create feedback for successful execution
feedback = f"I successfully executed the code and generated {len(result_docs)} output files."
if attempts_info and len(attempts_info) > 1:
feedback += f" (This required {len(attempts_info)-1} correction attempts)"
else:
# Code execution failed after all attempts
error = result.get("error", "Unknown error")
logger.error(f"Error in code execution after all correction attempts: {error}")
# Add error log as additional document
error_doc = {
"label": "execution_error.txt",
"content": f"Execution error:\n\n{error}"
}
generated_documents.append(error_doc)
# Create feedback for failed execution
feedback = f"An error occurred during code execution after {len(attempts_info)} correction attempts."
# If no specific outputs requested, create standard outputs
if not output_specs and result.get("success", False):
# Add standard output document
output_doc = {
"label": "execution_output.txt",
"content": output
}
generated_documents.append(output_doc)
# If a result is available, also add as JSON document
if execution_result:
result_json = json.dumps(execution_result, indent=2) if isinstance(execution_result, (dict, list)) else str(execution_result)
result_doc = {
"label": "execution_result.json",
"content": result_json
}
generated_documents.append(result_doc)
return {
"feedback": feedback,
"documents": generated_documents
}
except Exception as e:
error_msg = f"Error during processing by the Coder agent: {str(e)}"
logger.error(error_msg)
return {
"feedback": f"An error occurred during code processing: {str(e)}",
"documents": []
}
def _extract_document_context(self, documents: List[Dict[str, Any]]) -> str:
"""
Extract context from input documents for code generation.
Args:
documents: List of document objects
Returns:
Extracted context as text
"""
context_parts = []
for doc in documents:
doc_name = doc.get("name", "Unnamed document")
context_parts.append(f"--- {doc_name} ---")
for content in doc.get("contents", []):
if content.get("metadata", {}).get("is_text", False):
context_parts.append(content.get("data", ""))
return "\n\n".join(context_parts)
def _generate_result_documents(self, code: str, output: str, execution_result: Any,
output_specs: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Generate output documents based on execution results and specifications.
Args:
code: Executed code
output: Text output of the execution
execution_result: Result object from execution
output_specs: Output specifications
Returns:
List of generated document objects
"""
documents = []
# If no specific outputs requested
if not output_specs:
return documents
# Generate appropriate document for each requested output
for spec in output_specs:
output_label = spec.get("label", "")
output_description = spec.get("description", "")
# Determine output type based on file extension
format_type = self._determine_format_type(output_label)
# Generate document content based on format and output
if "code" in output_label.lower() or format_type in ["py", "js", "html", "css"]:
# Code document
documents.append({
"label": output_label,
"content": code
})
elif "output" in output_label.lower() or format_type == "txt":
# Output document
documents.append({
"label": output_label,
"content": output
})
elif format_type in ["json", "yml", "yaml"] and execution_result:
# JSON result document
if isinstance(execution_result, (dict, list)):
content = json.dumps(execution_result, indent=2)
else:
content = str(execution_result)
documents.append({
"label": output_label,
"content": content
})
else:
# Generic result document (fallback)
result_str = ""
if execution_result:
if isinstance(execution_result, (dict, list)):
result_str = json.dumps(execution_result, indent=2)
else:
result_str = str(execution_result)
documents.append({
"label": output_label,
"content": f"Code output:\n\n{output}\n\nResult:\n\n{result_str}"
})
return documents
def _determine_format_type(self, output_label: str) -> str:
"""
Determine the format type based on the filename.
Args:
output_label: Output filename
Returns:
Format type (py, js, json, txt, etc.)
"""
if not '.' in output_label:
return "txt" # Default format
extension = output_label.split('.')[-1].lower()
return extension
async def _execute_with_auto_correction(
self,
initial_code: str,
requirements: List[str],
context: Dict[str, Any],
original_prompt: str
) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]:
"""
Execute code with automatic error correction and retry attempts.
Args:
initial_code: The initial Python code
requirements: List of required packages
context: Additional context for execution
original_prompt: The original user request/prompt
Returns:
Tuple of (final execution result, list of attempt info dictionaries)
"""
# Initialize tracking data
current_code = initial_code
current_requirements = requirements.copy() if requirements else []
attempts_info = []
# Execute with correction loop
for attempt in range(1, self.max_correction_attempts + 1):
if attempt == 1:
logger.info(f"Executing code (attempt {attempt}/{self.max_correction_attempts})")
else:
logger.info(f"Executing corrected code (attempt {attempt}/{self.max_correction_attempts})")
# Execute current code version
result = await self._execute_code(current_code, current_requirements, context)
# Record attempt information
attempts_info.append({
"attempt": attempt,
"code": current_code,
"error": result.get("error", ""),
"success": result.get("success", False)
})
# Check if execution was successful
if result.get("success", False):
# Success! Return result and attempt info
return result, attempts_info
# Failed execution - check if max attempt limit reached
if attempt >= self.max_correction_attempts:
logger.warning(f"Maximum correction attempts ({self.max_correction_attempts}) reached")
break
# Correct code based on the error
error_message = result.get("error", "Unknown error")
logger.info(f"Attempting to fix code error: {error_message[:200]}...")
# Generate corrected code
corrected_code, new_requirements = await self._generate_code_correction(
current_code,
error_message,
original_prompt,
current_requirements
)
# Update for next attempt
if corrected_code:
current_code = corrected_code
# Add new requirements
if new_requirements:
for req in new_requirements:
if req not in current_requirements:
current_requirements.append(req)
logger.info(f"Added new requirement: {req}")
else:
# Correction couldn't be generated, end loop
logger.warning("Couldn't generate code correction")
break
# If we reach here, all attempts failed - return last result and attempt info
return result, attempts_info
async def _generate_code_correction(
self,
code: str,
error_message: str,
original_prompt: str,
current_requirements: List[str] = None
) -> Tuple[str, List[str]]:
"""
Generate a corrected version of code based on error messages.
Args:
code: The code that generated errors
error_message: The error message to fix
original_prompt: The original task/requirements
current_requirements: List of currently required packages
Returns:
Tuple of (corrected code, new requirements list)
"""
try:
# Create detailed prompt for code correction
correction_prompt = f"""You need to fix an error in Python code. The code was written for this task:
ORIGINAL TASK:
{original_prompt}
CURRENT CODE:
```python
{code}
```
ERROR MESSAGE:
```
{error_message}
```
CURRENT REQUIREMENTS: {', '.join(current_requirements) if current_requirements else "None"}
Your task is to analyze the error and provide a corrected version of the code.
Focus specifically on fixing the error while maintaining the original functionality.
Common fixes include:
- Fixing syntax errors (missing parentheses, indentation, etc.)
- Solving import errors by adding appropriate requirements
- Correcting file paths or handling "file not found" errors
- Adding error handling for specific edge cases
- Fixing logical errors in the code
FORMATTING GUIDELINES:
1. Provide ONLY the complete corrected Python code WITHOUT explanations
2. Do NOT use code block markers like ```python or ```
3. Do NOT explain what the code does before or after
4. Do NOT add any text that isn't valid Python code
5. Start your answer directly with valid Python code
6. End your answer with valid Python code
If you need to add new required packages, place them in a specially formatted comment at the beginning of your code as follows:
# REQUIREMENTS: package1,package2,package3
Your entire answer must be valid Python that can be executed without modifications.
"""
# Create messages for API
messages = [
{"role": "system", "content": "You are a Python debugging expert. You provide ONLY clean, error-free Python code, without explanations, markdown formatting, or text that isn't code."},
{"role": "user", "content": correction_prompt}
]
# Call API with very low temperature for deterministic corrections
generated_content = await self.ai_service.call_api(
messages,
temperature=0.1
)
# Clean up the generated content to ensure it's only valid Python code
fixed_code = self._clean_code(generated_content)
# Extract requirements from special comment at beginning of code
new_requirements = []
for line in fixed_code.split('\n'):
if line.strip().startswith("# REQUIREMENTS:"):
req_str = line.replace("# REQUIREMENTS:", "").strip()
new_requirements = [r.strip() for r in req_str.split(',') if r.strip()]
break
return fixed_code, new_requirements
except Exception as e:
logging.error(f"Error generating code correction: {str(e)}")
# Return None to indicate failure
return None, []
def _clean_code(self, code: str) -> str:
"""
Clean code by removing markdown code block markers and other formatting artifacts.
Args:
code: The code string to clean
Returns:
Cleaned code string
"""
# Remove code block markers at beginning/end
code = re.sub(r'^```(?:python)?\s*', '', code)
code = re.sub(r'```\s*$', '', code)
# Process lines in reverse order to start from the end
lines = code.split('\n')
clean_lines = []
in_trailing_markdown = False
for line in reversed(lines):
stripped = line.strip()
# Check if this line contains only backticks (``` or ` or ``)
if re.match(r'^`{1,3}$', stripped):
in_trailing_markdown = True
continue
# If we've reached actual code, no more trailing markdown consideration
if stripped and not in_trailing_markdown:
in_trailing_markdown = False
# Add this line if it's not part of trailing markdown
if not in_trailing_markdown:
clean_lines.insert(0, line)
# Rejoin lines
clean_code = '\n'.join(clean_lines)
# Final cleanup for any remaining backticks
clean_code = re.sub(r'`{1,3}\s*', '', clean_code)
return clean_code.strip()
async def _generate_code_from_prompt(self, prompt: str, document_context: str) -> Tuple[str, List[str]]:
"""
Generate Python code from a prompt using the AI service.
Args:
prompt: The prompt to generate code from
document_context: Context extracted from documents
Returns:
Tuple of (generated Python code, required packages)
"""
try:
# Prepare prompt for code generation
ai_prompt = f"""Generate Python code to solve the following task:
TASK:
{prompt}
PROVIDED CONTEXT:
{document_context if document_context else "No additional context available."}
IMPORTANT REQUIREMENTS:
1. Your code MUST define a 'result' variable to store the final result.
2. At the end of your script, the result variable should be output.
3. Make your 'result' variable a dictionary or other JSON-serializable data structure containing all relevant outputs.
4. Comment your code well to explain important operations.
5. Make your code complete and self-contained.
6. Add appropriate error handling.
FORMATTING INSTRUCTIONS:
- Return ONLY the Python code, WITHOUT introduction, explanation, or conclusion text
- Do NOT use code block markers like ```python or ```
- Do NOT explain what the code does before or after
- Do NOT add any text that isn't valid Python code
- Start your answer directly with valid Python code
- End your answer with valid Python code
For required packages, place them in a specially formatted comment at the beginning of your code in one line as follows:
# REQUIREMENTS: pandas,numpy,matplotlib,requests
Your entire answer must be valid Python that can be executed without modifications.
"""
# Create messages for API
messages = [
{"role": "system", "content": "You are a Python code generator who provides ONLY clean, executable Python code with no explanations, markdown formatting, or non-code text."},
{"role": "user", "content": ai_prompt}
]
# Call API
logging.info(f"Calling AI API to generate code")
generated_content = await self.ai_service.call_api(messages, temperature=self.ai_temperature)
# Clean up the generated content to ensure it's only valid Python code
code = self._clean_code(generated_content)
# Extract requirements from special comment at beginning of code
requirements = []
for line in code.split('\n'):
if line.strip().startswith("# REQUIREMENTS:"):
req_str = line.replace("# REQUIREMENTS:", "").strip()
requirements = [r.strip() for r in req_str.split(',') if r.strip()]
break
return code, requirements
except Exception as e:
logging.error(f"Error generating code with AI: {str(e)}")
# Return basic error handling code and no requirements
error_str = str(e).replace('"', '\\"')
return f"""
# Error in code generation
print(f"An error occurred during code generation: {error_str}")
# Return error result
result = {{"error": "Code generation failed", "message": "{error_str}"}}
""", []
async def _execute_code(self, code: str, requirements: List[str] = None, context: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Execute Python code in an isolated environment.
Args:
code: The Python code to execute
requirements: List of required packages
context: Additional context for execution
Returns:
Result of code execution
"""
# Use virtual code executor for isolated execution
try:
executor = SimpleCodeExecutor(
timeout=self.executor_timeout,
max_memory_mb=self.executor_memory_limit,
requirements=requirements,
ai_service=self.ai_service
)
# Prepare input data for the code
input_data = {"context": context} if context else {}
# Execute code
result = executor.execute_code(code, input_data)
# Clean up environment
executor.cleanup()
return result
except Exception as e:
error_message = f"Error during code execution: {str(e)}"
logger.error(error_message)
return {
"success": False,
"output": "",
"error": error_message,
"result": None
}
class SimpleCodeExecutor:
"""
A simplified executor that runs Python code in isolated virtual environments.
"""
def __init__(self,
timeout: int = 30,
max_memory_mb: int = 512,
requirements: List[str] = None,
ai_service = None):
"""
Initialize the SimpleCodeExecutor.
Args:
timeout: Maximum execution time in seconds
max_memory_mb: Maximum memory in MB
requirements: List of packages to install
ai_service: Optional - AI service for further processing
"""
self.timeout = timeout
self.max_memory_mb = max_memory_mb
self.temp_dir = None
self.requirements = requirements or []
self.blocked_packages = [
"cryptography", "flask", "django", "tornado", # Security risks
"tensorflow", "pytorch", "scikit-learn" # Resource-intensive packages
]
self.ai_service = ai_service
def _create_venv(self) -> str:
"""Create a virtual environment and return the path."""
# Create new environment
venv_parent_dir = tempfile.mkdtemp(prefix="code_exec_")
self.temp_dir = venv_parent_dir
venv_path = os.path.join(venv_parent_dir, "venv")
try:
# Create virtual environment
subprocess.run([sys.executable, "-m", "venv", venv_path],
check=True,
capture_output=True)
return venv_path
except subprocess.CalledProcessError as e:
logger.error(f"Error creating virtual environment: {e}")
raise RuntimeError(f"Virtual environment could not be created: {e}")
def _get_python_executable(self, venv_path: str) -> str:
"""Return the path to the Python executable in the virtual environment."""
if os.name == 'nt': # Windows
return os.path.join(venv_path, "Scripts", "python.exe")
else: # Unix/Linux
return os.path.join(venv_path, "bin", "python")
def execute_code(self, code: str, input_data: Dict[str, Any] = None) -> Dict[str, Any]:
"""
Execute Python code in an isolated environment.
Args:
code: Python code to execute
input_data: Optional input data for the code
Returns:
Dictionary with execution results
"""
logger.info("Executing code in isolated environment")
# Create virtual environment
venv_path = self._create_venv()
# Create file for the code
code_id = uuid.uuid4().hex[:8]
code_file = os.path.join(self.temp_dir, f"code_{code_id}.py")
# Write code
with open(code_file, "w", encoding="utf-8") as f:
f.write(code)
# Get Python executable
python_executable = self._get_python_executable(venv_path)
logger.info(f"Using Python executable: {python_executable}")
# Execute code
try:
# Execute code from root directory
working_dir = os.path.dirname(code_file)
process = subprocess.run(
[python_executable, code_file],
timeout=self.timeout,
capture_output=True,
text=True,
cwd=working_dir
)
# Process output
stdout = process.stdout
stderr = process.stderr
# Get result from stdout if available
result_data = None
if process.returncode == 0 and stdout:
try:
# Look for the last line that could be JSON
for line in reversed(stdout.strip().split('\n')):
line = line.strip()
if line and line[0] in '{[' and line[-1] in '}]':
try:
result_data = json.loads(line)
# Use successfully parsed JSON result
break
except json.JSONDecodeError:
# Not valid JSON, continue with next line
continue
except Exception as e:
logger.warning(f"Error parsing result from stdout: {str(e)}")
# Create result dictionary
execution_result = {
"success": process.returncode == 0,
"output": stdout,
"error": stderr if process.returncode != 0 else "",
"result": result_data,
"exit_code": process.returncode
}
except subprocess.TimeoutExpired:
logger.error(f"Execution timed out after {self.timeout} seconds")
execution_result = {
"success": False,
"output": "",
"error": f"Execution timed out (timeout after {self.timeout} seconds)",
"result": None,
"exit_code": -1
}
except Exception as e:
logger.error(f"Execution error: {str(e)}")
execution_result = {
"success": False,
"output": "",
"error": f"Execution error: {str(e)}",
"result": None,
"exit_code": -1
}
# Clean up temporary code file
try:
if os.path.exists(code_file):
os.remove(code_file)
except Exception as e:
logger.warning(f"Error cleaning up temporary code file: {e}")
return execution_result
def cleanup(self):
"""Clean up temporary resources."""
# Clean up temporary directory
if self.temp_dir and os.path.exists(self.temp_dir):
try:
shutil.rmtree(self.temp_dir)
logger.info(f"Temporary directory deleted: {self.temp_dir}")
except Exception as e:
logger.warning(f"Temporary directory {self.temp_dir} could not be deleted: {e}")
def __del__(self):
"""Cleanup during garbage collection."""
self.cleanup()
# Factory function for the Coder agent
def get_coder_agent():
"""
Factory function that returns an instance of the Coder agent.
Returns:
An instance of the Coder agent
"""
return AgentCoder()