gateway/modules/workflows/methods/methodAi/actions/generateCode.py
2026-01-19 09:18:37 +01:00

123 lines
5.2 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import logging
import time
from typing import Dict, Any, Optional, List
from modules.datamodels.datamodelChatbot import ActionResult, ActionDocument
from modules.datamodels.datamodelExtraction import ContentPart
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, PriorityEnum, ProcessingModeEnum
from modules.datamodels.datamodelWorkflow import AiResponse, DocumentData
logger = logging.getLogger(__name__)
async def generateCode(self, parameters: Dict[str, Any]) -> ActionResult:
prompt = parameters.get("prompt")
if not prompt:
return ActionResult.isFailure(error="prompt is required")
documentList = parameters.get("documentList", [])
# Optional: if omitted, formats determined from prompt by AI
resultType = parameters.get("resultType")
if not resultType:
logger.debug("resultType not provided - formats will be determined from prompt by AI")
# Create operation ID for progress tracking
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"code_gen_{workflowId}_{int(time.time())}"
parentOperationId = parameters.get('parentOperationId')
try:
# Convert documentList to DocumentReferenceList if needed
docRefList = None
if documentList:
from modules.datamodels.datamodelDocref import DocumentReferenceList
if isinstance(documentList, DocumentReferenceList):
docRefList = documentList
elif isinstance(documentList, str):
docRefList = DocumentReferenceList.from_string_list([documentList])
elif isinstance(documentList, list):
docRefList = DocumentReferenceList.from_string_list(documentList)
else:
docRefList = DocumentReferenceList(references=[])
# Prepare title
title = "Generated Code"
# Call AI service with explicit code intent
options = AiCallOptions(
operationType=OperationTypeEnum.DATA_GENERATE,
priority=PriorityEnum.BALANCED,
processingMode=ProcessingModeEnum.DETAILED
)
# outputFormat: Optional - if None, formats determined from prompt by AI
aiResponse: AiResponse = await self.services.ai.callAiContent(
prompt=prompt,
options=options,
documentList=docRefList,
outputFormat=resultType, # Can be None - AI determines from prompt
title=title,
parentOperationId=parentOperationId,
generationIntent="code" # Explicit intent, skips detection
)
# Convert AiResponse to ActionResult
documents = []
# Convert DocumentData to ActionDocument
if aiResponse.documents:
for docData in aiResponse.documents:
documents.append(ActionDocument(
documentName=docData.documentName,
documentData=docData.documentData,
mimeType=docData.mimeType,
sourceJson=docData.sourceJson if hasattr(docData, 'sourceJson') else None
))
# If no documents but content exists, create a document from content
if not documents and aiResponse.content:
# Determine document name from metadata
resultTypeFallback = resultType or "txt" # Fallback for file naming
docName = f"code.{resultTypeFallback}"
if aiResponse.metadata and aiResponse.metadata.filename:
docName = aiResponse.metadata.filename
elif aiResponse.metadata and aiResponse.metadata.title:
import re
sanitized = re.sub(r"[^a-zA-Z0-9._-]", "_", aiResponse.metadata.title)
sanitized = re.sub(r"_+", "_", sanitized).strip("_")
if sanitized:
if not sanitized.lower().endswith(f".{resultTypeFallback}"):
docName = f"{sanitized}.{resultTypeFallback}"
else:
docName = sanitized
# Determine mime type
mimeType = "text/plain"
if resultType == "html":
mimeType = "text/html"
elif resultType == "js":
mimeType = "application/javascript"
elif resultType == "py":
mimeType = "text/x-python"
elif resultType == "ts":
mimeType = "application/typescript"
elif resultType == "java":
mimeType = "text/x-java-source"
elif resultType == "cpp":
mimeType = "text/x-c++src"
documents.append(ActionDocument(
documentName=docName,
documentData=aiResponse.content.encode('utf-8') if isinstance(aiResponse.content, str) else aiResponse.content,
mimeType=mimeType
))
return ActionResult.isSuccess(documents=documents)
except Exception as e:
logger.error(f"Error in code generation: {str(e)}")
return ActionResult.isFailure(error=str(e))