gateway/modules/workflows/methods/methodAi/actions/process.py
2026-01-02 21:35:32 +01:00

227 lines
11 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import logging
import time
import json
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelAi import AiCallOptions
from modules.datamodels.datamodelExtraction import ContentPart
logger = logging.getLogger(__name__)
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
try:
# Init progress logger
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"ai_process_{workflowId}_{int(time.time())}"
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
if not parentOperationId:
logger.warning(f"ai.process: No parentOperationId provided in parameters. Operation '{operationId}' will appear at root level. Available parameters: {list(parameters.keys())}")
else:
logger.debug(f"ai.process: Using parentOperationId '{parentOperationId}' for operation '{operationId}'")
self.services.chat.progressLogStart(
operationId,
"Generate",
"AI Processing",
f"Format: {parameters.get('resultType', 'txt')}",
parentOperationId=parentOperationId
)
aiPrompt = parameters.get("aiPrompt")
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
# Update progress - preparing parameters
self.services.chat.progressLogUpdate(operationId, 0.2, "Preparing parameters")
from modules.datamodels.datamodelDocref import DocumentReferenceList
documentListParam = parameters.get("documentList")
# Convert to DocumentReferenceList if needed
if documentListParam is None:
documentList = DocumentReferenceList(references=[])
elif isinstance(documentListParam, DocumentReferenceList):
documentList = documentListParam
elif isinstance(documentListParam, str):
documentList = DocumentReferenceList.from_string_list([documentListParam])
elif isinstance(documentListParam, list):
documentList = DocumentReferenceList.from_string_list(documentListParam)
else:
logger.error(f"Invalid documentList type: {type(documentListParam)}")
documentList = DocumentReferenceList(references=[])
# Optional: if omitted, formats determined from prompt. Default "txt" is validation fallback only.
resultType = parameters.get("resultType")
if not aiPrompt:
logger.error(f"aiPrompt is missing or empty. Parameters: {parameters}")
return ActionResult.isFailure(
error="AI prompt is required"
)
# Handle optional resultType: if None, formats determined from prompt by AI
if resultType:
normalized_result_type = (str(resultType).strip().lstrip('.').lower() or "txt")
output_extension = f".{normalized_result_type}"
output_format = output_extension.replace('.', '') or 'txt'
logger.info(f"Using result type: {resultType} -> {output_extension}")
else:
# No format specified - AI will determine formats from prompt
normalized_result_type = None
output_extension = None
output_format = None
logger.debug("resultType not provided - formats will be determined from prompt by AI")
output_mime_type = "application/octet-stream" # Prefer service-provided mimeType when available
# Phase 7.3: Extract content first if documents provided, then use contentParts
# Check if contentParts are already provided (preferred path)
contentParts: Optional[List[ContentPart]] = None
if "contentParts" in parameters:
contentParts = parameters.get("contentParts")
if contentParts and not isinstance(contentParts, list):
# Try to extract from ContentExtracted if it's an ActionDocument
if hasattr(contentParts, 'parts'):
contentParts = contentParts.parts
else:
logger.warning(f"Invalid contentParts type: {type(contentParts)}, treating as empty")
contentParts = None
# If contentParts not provided but documentList is, extract content first
if not contentParts and documentList.references:
self.services.chat.progressLogUpdate(operationId, 0.3, "Extracting content from documents")
# Get ChatDocuments
chatDocuments = self.services.chat.getChatDocumentsFromDocumentList(documentList)
if not chatDocuments:
logger.warning("No documents found in documentList")
else:
logger.info(f"Extracting content from {len(chatDocuments)} documents")
# Prepare extraction options (use defaults if not provided)
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
extractionOptions = parameters.get("extractionOptions")
if not extractionOptions:
extractionOptions = ExtractionOptions(
prompt="Extract all content from the document",
mergeStrategy=MergeStrategy(
mergeType="concatenate",
groupBy="typeGroup",
orderBy="id"
),
processDocumentsIndividually=True
)
# Extract content using extraction service
extractedResults = self.services.extraction.extractContent(chatDocuments, extractionOptions)
# Combine all ContentParts from all extracted results
contentParts = []
for extracted in extractedResults:
if extracted.parts:
contentParts.extend(extracted.parts)
logger.info(f"Extracted {len(contentParts)} content parts from {len(extractedResults)} documents")
# Update progress - preparing AI call
self.services.chat.progressLogUpdate(operationId, 0.4, "Preparing AI call")
# Detect image generation from resultType (if provided)
imageFormats = ["png", "jpg", "jpeg", "gif", "webp"]
isImageGeneration = normalized_result_type in imageFormats if normalized_result_type else False
# Build options with correct operationType
from modules.datamodels.datamodelAi import OperationTypeEnum
options = AiCallOptions(
resultFormat=output_format or "txt", # Fallback for options, but outputFormat can be None for callAiContent
operationType=OperationTypeEnum.IMAGE_GENERATE if isImageGeneration else OperationTypeEnum.DATA_GENERATE
)
# Get generationIntent from parameters (required for DATA_GENERATE)
# Default to "document" if not provided (most common use case)
# For code generation, use ai.generateCode action or explicitly pass generationIntent="code"
generationIntent = parameters.get("generationIntent", "document")
# Update progress - calling AI
self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI")
# Use unified callAiContent method with contentParts (extraction is now separate)
# ContentParts are already extracted above (or None if no documents)
# outputFormat: Optional - if None, formats determined from prompt by AI
aiResponse = await self.services.ai.callAiContent(
prompt=aiPrompt,
options=options,
contentParts=contentParts, # Already extracted (or None if no documents)
outputFormat=output_format, # Can be None - AI determines from prompt
parentOperationId=operationId,
generationIntent=generationIntent # REQUIRED for DATA_GENERATE
)
# Update progress - processing result
self.services.chat.progressLogUpdate(operationId, 0.8, "Processing result")
# Extract documents from AiResponse
if aiResponse.documents and len(aiResponse.documents) > 0:
action_documents = []
for doc in aiResponse.documents:
validationMetadata = {
"actionType": "ai.process",
"resultType": normalized_result_type,
"outputFormat": output_format,
"hasDocuments": True,
"documentCount": len(aiResponse.documents)
}
action_documents.append(ActionDocument(
documentName=doc.documentName,
documentData=doc.documentData,
mimeType=doc.mimeType or output_mime_type,
sourceJson=getattr(doc, 'sourceJson', None), # Preserve source JSON for structure validation
validationMetadata=validationMetadata
))
final_documents = action_documents
else:
# Text response - create document from content
extension = output_extension.lstrip('.') if output_extension else "txt"
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
validationMetadata = {
"actionType": "ai.process",
"resultType": normalized_result_type or "auto",
"outputFormat": output_format or "auto",
"hasDocuments": False,
"contentType": "text"
}
action_document = ActionDocument(
documentName=meaningful_name,
documentData=aiResponse.content,
mimeType=output_mime_type,
validationMetadata=validationMetadata
)
final_documents = [action_document]
# Complete progress tracking
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(documents=final_documents)
except Exception as e:
logger.error(f"Error in AI processing: {str(e)}")
# Complete progress tracking with failure
try:
self.services.chat.progressLogFinish(operationId, False)
except:
pass # Don't fail on progress logging errors
return ActionResult.isFailure(
error=str(e)
)