gateway/modules/services/serviceGeneration/paths/codePath.py

584 lines
22 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
"""
Code Generation Path
Handles code generation with multi-file project support, dependency handling,
and proper cross-file references.
"""
import json
import logging
import time
import re
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata, DocumentData
from modules.datamodels.datamodelExtraction import ContentPart
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum
logger = logging.getLogger(__name__)
class CodeGenerationPath:
"""Code generation path."""
def __init__(self, services):
self.services = services
async def generateCode(
self,
userPrompt: str,
outputFormat: str = None,
contentParts: Optional[List[ContentPart]] = None,
title: str = "Generated Code",
parentOperationId: Optional[str] = None
) -> AiResponse:
"""
Generate code files with multi-file project support.
Returns: AiResponse with code files as documents
"""
# Create operation ID
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
codeOperationId = f"code_gen_{workflowId}_{int(time.time())}"
# Start progress tracking
self.services.chat.progressLogStart(
codeOperationId,
"Code Generation",
"Code Generation",
f"Format: {outputFormat or 'txt'}",
parentOperationId=parentOperationId
)
try:
# Detect language and project type from prompt or outputFormat
language, projectType = self._detectLanguageAndProjectType(userPrompt, outputFormat)
# Phase 1: Code structure generation (with looping)
self.services.chat.progressLogUpdate(codeOperationId, 0.2, "Generating code structure")
codeStructure = await self._generateCodeStructure(
userPrompt=userPrompt,
language=language,
outputFormat=outputFormat,
contentParts=contentParts
)
# Phase 2: Code content generation (with dependency handling)
self.services.chat.progressLogUpdate(codeOperationId, 0.5, "Generating code content")
codeFiles = await self._generateCodeContent(codeStructure, codeOperationId)
# Phase 3: Code formatting & validation
self.services.chat.progressLogUpdate(codeOperationId, 0.9, "Formatting code files")
formattedFiles = await self._formatAndValidateCode(codeFiles)
# Convert to unified document format
documents = []
for file in formattedFiles:
mimeType = self._getMimeType(file.get("fileType", outputFormat or "txt"))
content = file.get("content", "")
if isinstance(content, str):
contentBytes = content.encode('utf-8')
else:
contentBytes = content
documents.append(DocumentData(
documentName=file.get("filename", "generated.txt"),
documentData=contentBytes,
mimeType=mimeType,
sourceJson=file
))
metadata = AiResponseMetadata(
title=title,
operationType=OperationTypeEnum.DATA_GENERATE.value
)
self.services.chat.progressLogFinish(codeOperationId, True)
return AiResponse(
documents=documents,
content=None,
metadata=metadata
)
except Exception as e:
logger.error(f"Error in code generation: {str(e)}")
self.services.chat.progressLogFinish(codeOperationId, False)
raise
def _detectLanguageAndProjectType(self, userPrompt: str, outputFormat: Optional[str]) -> tuple:
"""Detect programming language and project type from prompt or format."""
promptLower = userPrompt.lower()
# Detect language
language = None
if outputFormat:
if outputFormat == "py":
language = "python"
elif outputFormat in ["js", "ts"]:
language = outputFormat
elif outputFormat == "html":
language = "html"
if not language:
if "python" in promptLower or ".py" in promptLower:
language = "python"
elif "javascript" in promptLower or ".js" in promptLower:
language = "javascript"
elif "typescript" in promptLower or ".ts" in promptLower:
language = "typescript"
elif "html" in promptLower:
language = "html"
else:
language = "python" # Default
# Detect project type
projectType = "single_file"
if "multi" in promptLower or "multiple files" in promptLower or "project" in promptLower:
projectType = "multi_file"
return language, projectType
async def _generateCodeStructure(
self,
userPrompt: str,
language: str,
outputFormat: Optional[str],
contentParts: Optional[List[ContentPart]]
) -> Dict[str, Any]:
"""Generate code structure using looping system."""
# Build structure generation prompt
structurePrompt = f"""Analyze the following code generation request and create a project structure.
Request: {userPrompt}
Language: {language}
Create a JSON structure with:
1. metadata: {{"language": "{language}", "projectType": "single_file|multi_file", "projectName": "..."}}
2. files: Array of file structures, each with:
- id: Unique identifier
- filename: File name (e.g., "main.py", "utils.py")
- fileType: File extension (e.g., "py", "js")
- dependencies: List of file IDs this file depends on (for multi-file projects)
- imports: List of import statements (for dependency extraction)
- functions: Array of function signatures {{"name": "...", "signature": "..."}}
- classes: Array of class definitions {{"name": "...", "signature": "..."}}
For single-file projects, return one file. For multi-file projects, break down into logical modules.
Return ONLY valid JSON in this format:
{{
"metadata": {{
"language": "{language}",
"projectType": "single_file",
"projectName": "generated-project"
}},
"files": [
{{
"id": "file_1",
"filename": "main.py",
"fileType": "py",
"dependencies": [],
"imports": [],
"functions": [],
"classes": []
}}
]
}}
"""
# Use generic looping system with code_structure use case
options = AiCallOptions(
operationType=OperationTypeEnum.DATA_GENERATE,
resultFormat="json"
)
structureJson = await self.services.ai.callAiWithLooping(
prompt=structurePrompt,
options=options,
useCaseId="code_structure",
debugPrefix="code_structure_generation",
contentParts=contentParts
)
parsed = json.loads(structureJson)
return parsed
async def _generateCodeContent(
self,
codeStructure: Dict[str, Any],
parentOperationId: str
) -> List[Dict[str, Any]]:
"""Generate code content for each file with dependency handling."""
files = codeStructure.get("files", [])
metadata = codeStructure.get("metadata", {})
if not files:
raise ValueError("No files found in code structure")
# Step 1: Resolve dependency order
orderedFiles = self._resolveDependencyOrder(files)
# Step 2: Generate dependency files first (requirements.txt, package.json, etc.)
dependencyFiles = await self._generateDependencyFiles(metadata, orderedFiles)
# Step 3: Generate code files in dependency order (not fully parallel)
codeFiles = []
generatedFileContext = {} # Track what's been generated for cross-file references
for idx, fileStructure in enumerate(orderedFiles):
# Update progress
progress = 0.5 + (0.4 * (idx / len(orderedFiles)))
self.services.chat.progressLogUpdate(
parentOperationId,
progress,
f"Generating {fileStructure.get('filename', 'file')}"
)
# Provide context about already-generated files for proper imports
fileContext = self._buildFileContext(generatedFileContext, fileStructure)
# Generate this file with context
fileContent = await self._generateSingleFileContent(
fileStructure,
fileContext=fileContext,
allFilesStructure=orderedFiles,
metadata=metadata
)
codeFiles.append(fileContent)
# Update context with generated file info (for next files)
generatedFileContext[fileStructure["id"]] = {
"filename": fileContent.get("filename", fileStructure.get("filename")),
"functions": fileContent.get("functions", []),
"classes": fileContent.get("classes", []),
"exports": fileContent.get("exports", [])
}
# Combine dependency files and code files
return dependencyFiles + codeFiles
def _resolveDependencyOrder(self, files: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Resolve file generation order based on dependencies using topological sort."""
# Build dependency graph
fileMap = {f["id"]: f for f in files}
dependencies = {}
for file in files:
fileId = file["id"]
deps = file.get("dependencies", []) # List of file IDs this file depends on
dependencies[fileId] = deps
# Topological sort
ordered = []
visited = set()
tempMark = set()
def visit(fileId: str):
if fileId in tempMark:
# Circular dependency detected - break it
logger.warning(f"Circular dependency detected involving {fileId}")
return
if fileId in visited:
return
tempMark.add(fileId)
for depId in dependencies.get(fileId, []):
if depId in fileMap:
visit(depId)
tempMark.remove(fileId)
visited.add(fileId)
ordered.append(fileMap[fileId])
for file in files:
if file["id"] not in visited:
visit(file["id"])
return ordered
async def _generateDependencyFiles(
self,
metadata: Dict[str, Any],
files: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Generate dependency files (requirements.txt, package.json, etc.)."""
language = metadata.get("language", "").lower()
dependencyFiles = []
# Generate requirements.txt for Python
if language in ["python", "py"]:
requirementsContent = await self._generateRequirementsTxt(files)
if requirementsContent:
dependencyFiles.append({
"filename": "requirements.txt",
"content": requirementsContent,
"fileType": "txt",
"id": "requirements_txt"
})
# Generate package.json for JavaScript/TypeScript
elif language in ["javascript", "typescript", "js", "ts"]:
packageJson = await self._generatePackageJson(files, metadata)
if packageJson:
dependencyFiles.append({
"filename": "package.json",
"content": json.dumps(packageJson, indent=2),
"fileType": "json",
"id": "package_json"
})
return dependencyFiles
async def _generateRequirementsTxt(
self,
files: List[Dict[str, Any]]
) -> Optional[str]:
"""Generate requirements.txt content from Python imports."""
pythonPackages = set()
for file in files:
imports = file.get("imports", [])
if isinstance(imports, list):
for imp in imports:
if isinstance(imp, str):
# Extract package name from import
# Handle: "from flask import", "import flask", "from flask import Flask"
imp = imp.strip()
if "import" in imp:
if "from" in imp:
# "from package import ..."
parts = imp.split("from")
if len(parts) > 1:
package = parts[1].split("import")[0].strip()
if package and not package.startswith("."):
pythonPackages.add(package.split(".")[0]) # Get root package
else:
# "import package" or "import package.module"
parts = imp.split("import")
if len(parts) > 1:
package = parts[1].strip().split(".")[0].strip()
if package and not package.startswith("."):
pythonPackages.add(package)
if pythonPackages:
return "\n".join(sorted(pythonPackages))
return None
async def _generatePackageJson(
self,
files: List[Dict[str, Any]],
metadata: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Generate package.json content from JavaScript/TypeScript imports."""
npmPackages = {}
for file in files:
imports = file.get("imports", [])
if isinstance(imports, list):
for imp in imports:
if isinstance(imp, str):
# Extract npm package from import
# Handle: "import express from 'express'", "const express = require('express')"
imp = imp.strip()
if "from" in imp:
# ES6 import: "import ... from 'package'"
parts = imp.split("from")
if len(parts) > 1:
package = parts[1].strip().strip("'\"")
if package and not package.startswith(".") and not package.startswith("/"):
npmPackages[package] = "*"
elif "require" in imp:
# CommonJS: "require('package')"
match = re.search(r"require\(['\"]([^'\"]+)['\"]\)", imp)
if match:
package = match.group(1)
if not package.startswith(".") and not package.startswith("/"):
npmPackages[package] = "*"
if npmPackages:
return {
"name": metadata.get("projectName", "generated-project"),
"version": "1.0.0",
"dependencies": npmPackages
}
return None
def _buildFileContext(
self,
generatedFileContext: Dict[str, Dict[str, Any]],
currentFile: Dict[str, Any]
) -> Dict[str, Any]:
"""Build context about other files for proper imports/references."""
context = {
"availableFiles": [],
"availableFunctions": {},
"availableClasses": {}
}
# Add info about already-generated files
for fileId, fileInfo in generatedFileContext.items():
context["availableFiles"].append({
"id": fileId,
"filename": fileInfo["filename"],
"functions": fileInfo.get("functions", []),
"classes": fileInfo.get("classes", []),
"exports": fileInfo.get("exports", [])
})
# Build function/class maps for easy lookup
for func in fileInfo.get("functions", []):
funcName = func.get("name", "")
if funcName:
context["availableFunctions"][funcName] = {
"file": fileInfo["filename"],
"signature": func.get("signature", "")
}
for cls in fileInfo.get("classes", []):
className = cls.get("name", "")
if className:
context["availableClasses"][className] = {
"file": fileInfo["filename"]
}
return context
async def _generateSingleFileContent(
self,
fileStructure: Dict[str, Any],
fileContext: Dict[str, Any] = None,
allFilesStructure: List[Dict[str, Any]] = None,
metadata: Dict[str, Any] = None
) -> Dict[str, Any]:
"""Generate code content for a single file with context about other files."""
# Build prompt with context about other files for proper imports
filename = fileStructure.get("filename", "generated.py")
fileType = fileStructure.get("fileType", "py")
dependencies = fileStructure.get("dependencies", [])
functions = fileStructure.get("functions", [])
classes = fileStructure.get("classes", [])
contextInfo = ""
if fileContext and fileContext.get("availableFiles"):
contextInfo = "\n\nAvailable files and their exports:\n"
for fileInfo in fileContext["availableFiles"]:
contextInfo += f"- {fileInfo['filename']}: "
funcs = [f.get("name", "") for f in fileInfo.get("functions", [])]
cls = [c.get("name", "") for c in fileInfo.get("classes", [])]
exports = []
if funcs:
exports.extend(funcs)
if cls:
exports.extend(cls)
if exports:
contextInfo += ", ".join(exports)
contextInfo += "\n"
contentPrompt = f"""Generate complete, executable code for the file: {filename}
File Type: {fileType}
Language: {metadata.get('language', 'python') if metadata else 'python'}
Required functions:
{json.dumps(functions, indent=2) if functions else 'None specified'}
Required classes:
{json.dumps(classes, indent=2) if classes else 'None specified'}
Dependencies on other files: {', '.join(dependencies) if dependencies else 'None'}
{contextInfo}
Generate complete, production-ready code with:
1. Proper imports (including imports from other files in the project if dependencies exist)
2. All required functions and classes
3. Error handling
4. Documentation/docstrings
5. Type hints where appropriate
Return ONLY valid JSON in this format:
{{
"files": [
{{
"filename": "{filename}",
"content": "// Complete code here",
"functions": {json.dumps(functions, indent=2) if functions else '[]'},
"classes": {json.dumps(classes, indent=2) if classes else '[]'}
}}
]
}}
"""
# Use generic looping system with code_content use case
options = AiCallOptions(
operationType=OperationTypeEnum.DATA_GENERATE,
resultFormat="json"
)
contentJson = await self.services.ai.callAiWithLooping(
prompt=contentPrompt,
options=options,
useCaseId="code_content",
debugPrefix=f"code_content_{fileStructure.get('id', 'file')}",
)
parsed = json.loads(contentJson)
# Extract file content and metadata
files = parsed.get("files", [])
if files and len(files) > 0:
fileData = files[0]
return {
"filename": fileData.get("filename", filename),
"content": fileData.get("content", ""),
"fileType": fileType,
"functions": fileData.get("functions", functions),
"classes": fileData.get("classes", classes),
"id": fileStructure.get("id")
}
# Fallback if structure is different
return {
"filename": filename,
"content": parsed.get("content", ""),
"fileType": fileType,
"functions": functions,
"classes": classes,
"id": fileStructure.get("id")
}
async def _formatAndValidateCode(self, codeFiles: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Format and validate generated code files."""
# For now, just return files as-is
# TODO: Add code formatting (black, prettier, etc.) and validation
formatted = []
for file in codeFiles:
content = file.get("content", "")
# Basic cleanup: remove markdown code fences if present
if isinstance(content, str):
content = re.sub(r'^```[\w]*\n', '', content, flags=re.MULTILINE)
content = re.sub(r'\n```$', '', content, flags=re.MULTILINE)
file["content"] = content.strip()
formatted.append(file)
return formatted
def _getMimeType(self, fileType: str) -> str:
"""Get MIME type for file type."""
mimeTypes = {
"py": "text/x-python",
"js": "application/javascript",
"ts": "application/typescript",
"html": "text/html",
"css": "text/css",
"json": "application/json",
"txt": "text/plain",
"md": "text/markdown",
"java": "text/x-java-source",
"cpp": "text/x-c++src",
"c": "text/x-csrc"
}
return mimeTypes.get(fileType.lower(), "text/plain")