# Copyright (c) 2025 Patrick Motsch # All rights reserved. """ Code Generation Path Handles code generation with multi-file project support, dependency handling, and proper cross-file references. """ import json import logging import time import re from typing import Dict, Any, List, Optional from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata, DocumentData from modules.datamodels.datamodelExtraction import ContentPart from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum logger = logging.getLogger(__name__) class CodeGenerationPath: """Code generation path.""" def __init__(self, services): self.services = services async def generateCode( self, userPrompt: str, outputFormat: str = None, contentParts: Optional[List[ContentPart]] = None, title: str = "Generated Code", parentOperationId: Optional[str] = None ) -> AiResponse: """ Generate code files with multi-file project support. Returns: AiResponse with code files as documents """ # Create operation ID workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}" codeOperationId = f"code_gen_{workflowId}_{int(time.time())}" # Start progress tracking self.services.chat.progressLogStart( codeOperationId, "Code Generation", "Code Generation", f"Format: {outputFormat or 'txt'}", parentOperationId=parentOperationId ) try: # Detect language and project type from prompt or outputFormat language, projectType = self._detectLanguageAndProjectType(userPrompt, outputFormat) # Phase 1: Code structure generation (with looping) self.services.chat.progressLogUpdate(codeOperationId, 0.2, "Generating code structure") codeStructure = await self._generateCodeStructure( userPrompt=userPrompt, language=language, outputFormat=outputFormat, contentParts=contentParts ) # Phase 2: Code content generation (with dependency handling) self.services.chat.progressLogUpdate(codeOperationId, 0.5, "Generating code content") codeFiles = await self._generateCodeContent(codeStructure, codeOperationId) # Phase 3: Code formatting & validation self.services.chat.progressLogUpdate(codeOperationId, 0.8, "Formatting code files") formattedFiles = await self._formatAndValidateCode(codeFiles) # Phase 4: Code Rendering (Renderer-Based) self.services.chat.progressLogUpdate(codeOperationId, 0.9, "Rendering code files") # Group files by format filesByFormat = {} for file in formattedFiles: fileType = file.get("fileType", outputFormat or "txt") if fileType not in filesByFormat: filesByFormat[fileType] = [] filesByFormat[fileType].append(file) # Render each format group using appropriate renderer allRenderedDocuments = [] for fileType, files in filesByFormat.items(): # Get renderer for this format renderer = self._getCodeRenderer(fileType) if renderer: # Use code renderer renderedDocs = await renderer.renderCodeFiles( codeFiles=files, metadata=codeStructure.get("metadata", {}), userPrompt=userPrompt ) allRenderedDocuments.extend(renderedDocs) else: # Fallback: output directly (for formats without renderers) for file in files: mimeType = self._getMimeType(file.get("fileType", "txt")) content = file.get("content", "") contentBytes = content.encode('utf-8') if isinstance(content, str) else content from modules.datamodels.datamodelDocument import RenderedDocument allRenderedDocuments.append( RenderedDocument( documentData=contentBytes, mimeType=mimeType, filename=file.get("filename", "generated.txt"), metadata=codeStructure.get("metadata", {}) ) ) # Convert RenderedDocument to DocumentData documents = [] for renderedDoc in allRenderedDocuments: documents.append(DocumentData( documentName=renderedDoc.filename, documentData=renderedDoc.documentData, mimeType=renderedDoc.mimeType, sourceJson=renderedDoc.metadata if hasattr(renderedDoc, 'metadata') else None )) metadata = AiResponseMetadata( title=title, operationType=OperationTypeEnum.DATA_GENERATE.value ) self.services.chat.progressLogFinish(codeOperationId, True) return AiResponse( documents=documents, content=None, metadata=metadata ) except Exception as e: logger.error(f"Error in code generation: {str(e)}") self.services.chat.progressLogFinish(codeOperationId, False) raise def _detectLanguageAndProjectType(self, userPrompt: str, outputFormat: Optional[str]) -> tuple: """Detect programming language and project type from prompt or format.""" promptLower = userPrompt.lower() # Detect language language = None if outputFormat: if outputFormat == "py": language = "python" elif outputFormat in ["js", "ts"]: language = outputFormat elif outputFormat == "html": language = "html" if not language: if "python" in promptLower or ".py" in promptLower: language = "python" elif "javascript" in promptLower or ".js" in promptLower: language = "javascript" elif "typescript" in promptLower or ".ts" in promptLower: language = "typescript" elif "html" in promptLower: language = "html" else: language = "python" # Default # Detect project type projectType = "single_file" if "multi" in promptLower or "multiple files" in promptLower or "project" in promptLower: projectType = "multi_file" return language, projectType async def _generateCodeStructure( self, userPrompt: str, language: str, outputFormat: Optional[str], contentParts: Optional[List[ContentPart]] ) -> Dict[str, Any]: """Generate code structure using looping system.""" # Build structure generation prompt structurePrompt = f"""Analyze the following code generation request and create a project structure. Request: {userPrompt} Language: {language} Create a JSON structure with: 1. metadata: {{"language": "{language}", "projectType": "single_file|multi_file", "projectName": "..."}} 2. files: Array of file structures, each with: - id: Unique identifier - filename: File name (e.g., "main.py", "utils.py") - fileType: File extension (e.g., "py", "js") - dependencies: List of file IDs this file depends on (for multi-file projects) - imports: List of import statements (for dependency extraction) - functions: Array of function signatures {{"name": "...", "signature": "..."}} - classes: Array of class definitions {{"name": "...", "signature": "..."}} For single-file projects, return one file. For multi-file projects, break down into logical modules. Return ONLY valid JSON in this format: {{ "metadata": {{ "language": "{language}", "projectType": "single_file", "projectName": "generated-project" }}, "files": [ {{ "id": "file_1", "filename": "main.py", "fileType": "py", "dependencies": [], "imports": [], "functions": [], "classes": [] }} ] }} """ # Use generic looping system with code_structure use case options = AiCallOptions( operationType=OperationTypeEnum.DATA_GENERATE, resultFormat="json" ) structureJson = await self.services.ai.callAiWithLooping( prompt=structurePrompt, options=options, useCaseId="code_structure", debugPrefix="code_structure_generation", contentParts=contentParts ) parsed = json.loads(structureJson) return parsed async def _generateCodeContent( self, codeStructure: Dict[str, Any], parentOperationId: str ) -> List[Dict[str, Any]]: """Generate code content for each file with dependency handling.""" files = codeStructure.get("files", []) metadata = codeStructure.get("metadata", {}) if not files: raise ValueError("No files found in code structure") # Step 1: Resolve dependency order orderedFiles = self._resolveDependencyOrder(files) # Step 2: Generate dependency files first (requirements.txt, package.json, etc.) dependencyFiles = await self._generateDependencyFiles(metadata, orderedFiles) # Step 3: Generate code files in dependency order (not fully parallel) codeFiles = [] generatedFileContext = {} # Track what's been generated for cross-file references for idx, fileStructure in enumerate(orderedFiles): # Update progress progress = 0.5 + (0.4 * (idx / len(orderedFiles))) self.services.chat.progressLogUpdate( parentOperationId, progress, f"Generating {fileStructure.get('filename', 'file')}" ) # Provide context about already-generated files for proper imports fileContext = self._buildFileContext(generatedFileContext, fileStructure) # Generate this file with context fileContent = await self._generateSingleFileContent( fileStructure, fileContext=fileContext, allFilesStructure=orderedFiles, metadata=metadata ) codeFiles.append(fileContent) # Update context with generated file info (for next files) generatedFileContext[fileStructure["id"]] = { "filename": fileContent.get("filename", fileStructure.get("filename")), "functions": fileContent.get("functions", []), "classes": fileContent.get("classes", []), "exports": fileContent.get("exports", []) } # Combine dependency files and code files return dependencyFiles + codeFiles def _resolveDependencyOrder(self, files: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Resolve file generation order based on dependencies using topological sort.""" # Build dependency graph fileMap = {f["id"]: f for f in files} dependencies = {} for file in files: fileId = file["id"] deps = file.get("dependencies", []) # List of file IDs this file depends on dependencies[fileId] = deps # Topological sort ordered = [] visited = set() tempMark = set() def visit(fileId: str): if fileId in tempMark: # Circular dependency detected - break it logger.warning(f"Circular dependency detected involving {fileId}") return if fileId in visited: return tempMark.add(fileId) for depId in dependencies.get(fileId, []): if depId in fileMap: visit(depId) tempMark.remove(fileId) visited.add(fileId) ordered.append(fileMap[fileId]) for file in files: if file["id"] not in visited: visit(file["id"]) return ordered async def _generateDependencyFiles( self, metadata: Dict[str, Any], files: List[Dict[str, Any]] ) -> List[Dict[str, Any]]: """Generate dependency files (requirements.txt, package.json, etc.).""" language = metadata.get("language", "").lower() dependencyFiles = [] # Generate requirements.txt for Python if language in ["python", "py"]: requirementsContent = await self._generateRequirementsTxt(files) if requirementsContent: dependencyFiles.append({ "filename": "requirements.txt", "content": requirementsContent, "fileType": "txt", "id": "requirements_txt" }) # Generate package.json for JavaScript/TypeScript elif language in ["javascript", "typescript", "js", "ts"]: packageJson = await self._generatePackageJson(files, metadata) if packageJson: dependencyFiles.append({ "filename": "package.json", "content": json.dumps(packageJson, indent=2), "fileType": "json", "id": "package_json" }) return dependencyFiles async def _generateRequirementsTxt( self, files: List[Dict[str, Any]] ) -> Optional[str]: """Generate requirements.txt content from Python imports.""" pythonPackages = set() for file in files: imports = file.get("imports", []) if isinstance(imports, list): for imp in imports: if isinstance(imp, str): # Extract package name from import # Handle: "from flask import", "import flask", "from flask import Flask" imp = imp.strip() if "import" in imp: if "from" in imp: # "from package import ..." parts = imp.split("from") if len(parts) > 1: package = parts[1].split("import")[0].strip() if package and not package.startswith("."): pythonPackages.add(package.split(".")[0]) # Get root package else: # "import package" or "import package.module" parts = imp.split("import") if len(parts) > 1: package = parts[1].strip().split(".")[0].strip() if package and not package.startswith("."): pythonPackages.add(package) if pythonPackages: return "\n".join(sorted(pythonPackages)) return None async def _generatePackageJson( self, files: List[Dict[str, Any]], metadata: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """Generate package.json content from JavaScript/TypeScript imports.""" npmPackages = {} for file in files: imports = file.get("imports", []) if isinstance(imports, list): for imp in imports: if isinstance(imp, str): # Extract npm package from import # Handle: "import express from 'express'", "const express = require('express')" imp = imp.strip() if "from" in imp: # ES6 import: "import ... from 'package'" parts = imp.split("from") if len(parts) > 1: package = parts[1].strip().strip("'\"") if package and not package.startswith(".") and not package.startswith("/"): npmPackages[package] = "*" elif "require" in imp: # CommonJS: "require('package')" match = re.search(r"require\(['\"]([^'\"]+)['\"]\)", imp) if match: package = match.group(1) if not package.startswith(".") and not package.startswith("/"): npmPackages[package] = "*" if npmPackages: return { "name": metadata.get("projectName", "generated-project"), "version": "1.0.0", "dependencies": npmPackages } return None def _buildFileContext( self, generatedFileContext: Dict[str, Dict[str, Any]], currentFile: Dict[str, Any] ) -> Dict[str, Any]: """Build context about other files for proper imports/references.""" context = { "availableFiles": [], "availableFunctions": {}, "availableClasses": {} } # Add info about already-generated files for fileId, fileInfo in generatedFileContext.items(): context["availableFiles"].append({ "id": fileId, "filename": fileInfo["filename"], "functions": fileInfo.get("functions", []), "classes": fileInfo.get("classes", []), "exports": fileInfo.get("exports", []) }) # Build function/class maps for easy lookup for func in fileInfo.get("functions", []): funcName = func.get("name", "") if funcName: context["availableFunctions"][funcName] = { "file": fileInfo["filename"], "signature": func.get("signature", "") } for cls in fileInfo.get("classes", []): className = cls.get("name", "") if className: context["availableClasses"][className] = { "file": fileInfo["filename"] } return context async def _generateSingleFileContent( self, fileStructure: Dict[str, Any], fileContext: Dict[str, Any] = None, allFilesStructure: List[Dict[str, Any]] = None, metadata: Dict[str, Any] = None ) -> Dict[str, Any]: """Generate code content for a single file with context about other files.""" # Build prompt with context about other files for proper imports filename = fileStructure.get("filename", "generated.py") fileType = fileStructure.get("fileType", "py") dependencies = fileStructure.get("dependencies", []) functions = fileStructure.get("functions", []) classes = fileStructure.get("classes", []) contextInfo = "" if fileContext and fileContext.get("availableFiles"): contextInfo = "\n\nAvailable files and their exports:\n" for fileInfo in fileContext["availableFiles"]: contextInfo += f"- {fileInfo['filename']}: " funcs = [f.get("name", "") for f in fileInfo.get("functions", [])] cls = [c.get("name", "") for c in fileInfo.get("classes", [])] exports = [] if funcs: exports.extend(funcs) if cls: exports.extend(cls) if exports: contextInfo += ", ".join(exports) contextInfo += "\n" contentPrompt = f"""Generate complete, executable code for the file: {filename} File Type: {fileType} Language: {metadata.get('language', 'python') if metadata else 'python'} Required functions: {json.dumps(functions, indent=2) if functions else 'None specified'} Required classes: {json.dumps(classes, indent=2) if classes else 'None specified'} Dependencies on other files: {', '.join(dependencies) if dependencies else 'None'} {contextInfo} Generate complete, production-ready code with: 1. Proper imports (including imports from other files in the project if dependencies exist) 2. All required functions and classes 3. Error handling 4. Documentation/docstrings 5. Type hints where appropriate Return ONLY valid JSON in this format: {{ "files": [ {{ "filename": "{filename}", "content": "// Complete code here", "functions": {json.dumps(functions, indent=2) if functions else '[]'}, "classes": {json.dumps(classes, indent=2) if classes else '[]'} }} ] }} """ # Use generic looping system with code_content use case options = AiCallOptions( operationType=OperationTypeEnum.DATA_GENERATE, resultFormat="json" ) contentJson = await self.services.ai.callAiWithLooping( prompt=contentPrompt, options=options, useCaseId="code_content", debugPrefix=f"code_content_{fileStructure.get('id', 'file')}", ) parsed = json.loads(contentJson) # Extract file content and metadata files = parsed.get("files", []) if files and len(files) > 0: fileData = files[0] return { "filename": fileData.get("filename", filename), "content": fileData.get("content", ""), "fileType": fileType, "functions": fileData.get("functions", functions), "classes": fileData.get("classes", classes), "id": fileStructure.get("id") } # Fallback if structure is different return { "filename": filename, "content": parsed.get("content", ""), "fileType": fileType, "functions": functions, "classes": classes, "id": fileStructure.get("id") } async def _formatAndValidateCode(self, codeFiles: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """Format and validate generated code files.""" # For now, just return files as-is # TODO: Add code formatting (black, prettier, etc.) and validation formatted = [] for file in codeFiles: content = file.get("content", "") # Basic cleanup: remove markdown code fences if present if isinstance(content, str): content = re.sub(r'^```[\w]*\n', '', content, flags=re.MULTILINE) content = re.sub(r'\n```$', '', content, flags=re.MULTILINE) file["content"] = content.strip() formatted.append(file) return formatted def _getMimeType(self, fileType: str) -> str: """Get MIME type for file type.""" mimeTypes = { "py": "text/x-python", "js": "application/javascript", "ts": "application/typescript", "html": "text/html", "css": "text/css", "json": "application/json", "txt": "text/plain", "md": "text/markdown", "java": "text/x-java-source", "cpp": "text/x-c++src", "c": "text/x-csrc", "csv": "text/csv", "xml": "application/xml" } return mimeTypes.get(fileType.lower(), "text/plain") def _getCodeRenderer(self, fileType: str): """Get code renderer for file type.""" from modules.services.serviceGeneration.renderers.registry import getRenderer # Map file types to renderer formats formatMap = { 'json': 'json', 'csv': 'csv', 'xml': 'xml' } rendererFormat = formatMap.get(fileType.lower()) if rendererFormat: renderer = getRenderer(rendererFormat, self.services) # Check if renderer supports code rendering if renderer and hasattr(renderer, 'renderCodeFiles'): return renderer return None