gateway/modules/workflows/methods/methodAi/actions/process.py

239 lines
11 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import logging
import time
import json
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelAi import AiCallOptions
from modules.datamodels.datamodelExtraction import ContentPart
logger = logging.getLogger(__name__)
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
try:
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"ai_process_{workflowId}_{int(time.time())}"
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
if not parentOperationId:
logger.warning(f"ai.process: No parentOperationId provided in parameters. Operation '{operationId}' will appear at root level. Available parameters: {list(parameters.keys())}")
else:
logger.debug(f"ai.process: Using parentOperationId '{parentOperationId}' for operation '{operationId}'")
self.services.chat.progressLogStart(
operationId,
"Generate",
"AI Processing",
f"Format: {parameters.get('resultType', 'txt')}",
parentOperationId=parentOperationId
)
aiPrompt = parameters.get("aiPrompt")
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
# Update progress - preparing parameters
self.services.chat.progressLogUpdate(operationId, 0.2, "Preparing parameters")
from modules.datamodels.datamodelDocref import DocumentReferenceList
documentListParam = parameters.get("documentList")
# Convert to DocumentReferenceList if needed
if documentListParam is None:
documentList = DocumentReferenceList(references=[])
elif isinstance(documentListParam, DocumentReferenceList):
documentList = documentListParam
elif isinstance(documentListParam, str):
documentList = DocumentReferenceList.from_string_list([documentListParam])
elif isinstance(documentListParam, list):
documentList = DocumentReferenceList.from_string_list(documentListParam)
else:
logger.error(f"Invalid documentList type: {type(documentListParam)}")
documentList = DocumentReferenceList(references=[])
resultType = parameters.get("resultType", "txt")
if not aiPrompt:
logger.error(f"aiPrompt is missing or empty. Parameters: {parameters}")
return ActionResult.isFailure(
error="AI prompt is required"
)
# Determine output extension and default MIME type without duplicating service logic
normalized_result_type = (str(resultType).strip().lstrip('.').lower() or "txt")
output_extension = f".{normalized_result_type}"
output_mime_type = "application/octet-stream" # Prefer service-provided mimeType when available
logger.info(f"Using result type: {resultType} -> {output_extension}")
# Phase 7.3: Extract content first if documents provided, then use contentParts
# Check if contentParts are already provided (preferred path)
contentParts: Optional[List[ContentPart]] = None
if "contentParts" in parameters:
contentParts = parameters.get("contentParts")
if contentParts and not isinstance(contentParts, list):
# Try to extract from ContentExtracted if it's an ActionDocument
if hasattr(contentParts, 'parts'):
contentParts = contentParts.parts
else:
logger.warning(f"Invalid contentParts type: {type(contentParts)}, treating as empty")
contentParts = None
# If contentParts not provided but documentList is, extract content first
if not contentParts and documentList.references:
self.services.chat.progressLogUpdate(operationId, 0.3, "Extracting content from documents")
# Get ChatDocuments
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
logger.warning("No documents found in documentList")
else:
logger.info(f"Extracting content from {len(chatDocuments)} documents")
# Prepare extraction options (use defaults if not provided)
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
extractionOptions = parameters.get("extractionOptions")
if not extractionOptions:
extractionOptions = ExtractionOptions(
prompt="Extract all content from the document",
mergeStrategy=MergeStrategy(
mergeType="concatenate",
groupBy="typeGroup",
orderBy="id"
),
processDocumentsIndividually=True
)
# Extract content using extraction service
extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions)
# Combine all ContentParts from all extracted results
contentParts = []
for extracted in extractedResults:
if extracted.parts:
contentParts.extend(extracted.parts)
logger.info(f"Extracted {len(contentParts)} content parts from {len(extractedResults)} documents")
# Update progress - preparing AI call
self.services.chat.progressLogUpdate(operationId, 0.4, "Preparing AI call")
# Detect image generation from resultType
imageFormats = ["png", "jpg", "jpeg", "gif", "webp"]
isImageGeneration = normalized_result_type in imageFormats
# Build options with correct operationType
output_format = output_extension.replace('.', '') or 'txt'
from modules.datamodels.datamodelAi import OperationTypeEnum
options = AiCallOptions(
resultFormat=output_format,
operationType=OperationTypeEnum.IMAGE_GENERATE if isImageGeneration else OperationTypeEnum.DATA_GENERATE
)
# Get generationIntent from parameters
generationIntent = parameters.get("generationIntent")
# For DATA_GENERATE, generationIntent is REQUIRED
# If not provided, default to "document" for document formats (xlsx, docx, pdf, txt, html, etc.)
# This is format-based defaulting, not prompt-based auto-detection
if options.operationType == OperationTypeEnum.DATA_GENERATE and not generationIntent:
# Document formats (default to document generation)
documentFormats = ["xlsx", "docx", "pdf", "txt", "md", "html", "csv", "xml", "json", "pptx"]
# Code formats (should use ai.generateCode instead, but default to code if ai.process is used)
codeFormats = ["py", "js", "ts", "java", "cpp", "c", "go", "rs", "rb", "php", "swift", "kt"]
if normalized_result_type in documentFormats:
generationIntent = "document"
logger.info(f"Defaulting generationIntent to 'document' for resultType '{normalized_result_type}'")
elif normalized_result_type in codeFormats:
generationIntent = "code"
logger.info(f"Defaulting generationIntent to 'code' for resultType '{normalized_result_type}'")
else:
# Unknown format - default to document (most common use case)
generationIntent = "document"
logger.warning(
f"Unknown resultType '{normalized_result_type}', defaulting generationIntent to 'document'. "
f"For code generation, use ai.generateCode action or explicitly pass generationIntent='code'."
)
# Update progress - calling AI
self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI")
# Use unified callAiContent method with contentParts (extraction is now separate)
# ContentParts are already extracted above (or None if no documents)
aiResponse = await self.services.ai.callAiContent(
prompt=aiPrompt,
options=options,
contentParts=contentParts, # Already extracted (or None if no documents)
outputFormat=output_format,
parentOperationId=operationId,
generationIntent=generationIntent # REQUIRED for DATA_GENERATE
)
# Update progress - processing result
self.services.chat.progressLogUpdate(operationId, 0.8, "Processing result")
# Extract documents from AiResponse
if aiResponse.documents and len(aiResponse.documents) > 0:
action_documents = []
for doc in aiResponse.documents:
validationMetadata = {
"actionType": "ai.process",
"resultType": normalized_result_type,
"outputFormat": output_format,
"hasDocuments": True,
"documentCount": len(aiResponse.documents)
}
action_documents.append(ActionDocument(
documentName=doc.documentName,
documentData=doc.documentData,
mimeType=doc.mimeType or output_mime_type,
sourceJson=getattr(doc, 'sourceJson', None), # Preserve source JSON for structure validation
validationMetadata=validationMetadata
))
final_documents = action_documents
else:
# Text response - create document from content
extension = output_extension.lstrip('.')
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
validationMetadata = {
"actionType": "ai.process",
"resultType": normalized_result_type,
"outputFormat": output_format,
"hasDocuments": False,
"contentType": "text"
}
action_document = ActionDocument(
documentName=meaningful_name,
documentData=aiResponse.content,
mimeType=output_mime_type,
validationMetadata=validationMetadata
)
final_documents = [action_document]
# Complete progress tracking
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(documents=final_documents)
except Exception as e:
logger.error(f"Error in AI processing: {str(e)}")
# Complete progress tracking with failure
try:
self.services.chat.progressLogFinish(operationId, False)
except:
pass # Don't fail on progress logging errors
return ActionResult.isFailure(
error=str(e)
)