gateway/modules/workflows/methods/methodAi/actions/generateCode.py

135 lines
5.5 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import logging
import time
from typing import Dict, Any, Optional, List
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelExtraction import ContentPart
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
from modules.datamodels.datamodelWorkflow import AiResponse, DocumentData
logger = logging.getLogger(__name__)
async def generateCode(self, parameters: Dict[str, Any]) -> ActionResult:
prompt = parameters.get("prompt")
if not prompt:
return ActionResult.isFailure(error="prompt is required")
documentList = parameters.get("documentList", [])
resultType = parameters.get("resultType")
# Auto-detect format from prompt if not provided
if not resultType:
promptLower = prompt.lower()
if ".html" in promptLower or "html file" in promptLower:
resultType = "html"
elif ".js" in promptLower or "javascript" in promptLower:
resultType = "js"
elif ".py" in promptLower or "python" in promptLower:
resultType = "py"
elif ".ts" in promptLower or "typescript" in promptLower:
resultType = "ts"
elif ".java" in promptLower:
resultType = "java"
elif ".cpp" in promptLower or ".c++" in promptLower:
resultType = "cpp"
else:
resultType = "txt" # Default
# Create operation ID for progress tracking
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"code_gen_{workflowId}_{int(time.time())}"
parentOperationId = parameters.get('parentOperationId')
try:
# Convert documentList to DocumentReferenceList if needed
docRefList = None
if documentList:
from modules.datamodels.datamodelDocref import DocumentReferenceList
if isinstance(documentList, DocumentReferenceList):
docRefList = documentList
elif isinstance(documentList, str):
docRefList = DocumentReferenceList.from_string_list([documentList])
elif isinstance(documentList, list):
docRefList = DocumentReferenceList.from_string_list(documentList)
else:
docRefList = DocumentReferenceList(references=[])
# Prepare title
title = "Generated Code"
# Call AI service with explicit code intent
options = AiCallOptions(
operationType=OperationTypeEnum.DATA_GENERATE,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.DETAILED
)
aiResponse: AiResponse = await self.services.ai.callAiContent(
prompt=prompt,
options=options,
documentList=docRefList,
outputFormat=resultType,
title=title,
parentOperationId=parentOperationId,
generationIntent="code" # Explicit intent, skips detection
)
# Convert AiResponse to ActionResult
documents = []
# Convert DocumentData to ActionDocument
if aiResponse.documents:
for docData in aiResponse.documents:
documents.append(ActionDocument(
documentName=docData.documentName,
documentData=docData.documentData,
mimeType=docData.mimeType,
sourceJson=docData.sourceJson if hasattr(docData, 'sourceJson') else None
))
# If no documents but content exists, create a document from content
if not documents and aiResponse.content:
# Determine document name from metadata
docName = f"code.{resultType}"
if aiResponse.metadata and aiResponse.metadata.filename:
docName = aiResponse.metadata.filename
elif aiResponse.metadata and aiResponse.metadata.title:
import re
sanitized = re.sub(r"[^a-zA-Z0-9._-]", "_", aiResponse.metadata.title)
sanitized = re.sub(r"_+", "_", sanitized).strip("_")
if sanitized:
if not sanitized.lower().endswith(f".{resultType}"):
docName = f"{sanitized}.{resultType}"
else:
docName = sanitized
# Determine mime type
mimeType = "text/plain"
if resultType == "html":
mimeType = "text/html"
elif resultType == "js":
mimeType = "application/javascript"
elif resultType == "py":
mimeType = "text/x-python"
elif resultType == "ts":
mimeType = "application/typescript"
elif resultType == "java":
mimeType = "text/x-java-source"
elif resultType == "cpp":
mimeType = "text/x-c++src"
documents.append(ActionDocument(
documentName=docName,
documentData=aiResponse.content.encode('utf-8') if isinstance(aiResponse.content, str) else aiResponse.content,
mimeType=mimeType
))
return ActionResult.isSuccess(documents=documents)
except Exception as e:
logger.error(f"Error in code generation: {str(e)}")
return ActionResult.isFailure(error=str(e))