276 lines
14 KiB
Python
276 lines
14 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
|
# All rights reserved.
|
|
|
|
import logging
|
|
import time
|
|
import json
|
|
from typing import Dict, Any, List, Optional
|
|
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
|
|
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
|
|
from modules.datamodels.datamodelExtraction import ContentPart
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
try:
|
|
# Init progress logger
|
|
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
|
|
operationId = f"ai_process_{workflowId}_{int(time.time())}"
|
|
|
|
# Start progress tracking
|
|
parentOperationId = parameters.get('parentOperationId')
|
|
if not parentOperationId:
|
|
logger.warning(f"ai.process: No parentOperationId provided in parameters. Operation '{operationId}' will appear at root level. Available parameters: {list(parameters.keys())}")
|
|
else:
|
|
logger.debug(f"ai.process: Using parentOperationId '{parentOperationId}' for operation '{operationId}'")
|
|
|
|
self.services.chat.progressLogStart(
|
|
operationId,
|
|
"Generate",
|
|
"AI Processing",
|
|
f"Format: {parameters.get('resultType', 'txt')}",
|
|
parentOperationId=parentOperationId
|
|
)
|
|
|
|
aiPrompt = parameters.get("aiPrompt")
|
|
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
|
|
|
|
# Update progress - preparing parameters
|
|
self.services.chat.progressLogUpdate(operationId, 0.2, "Preparing parameters")
|
|
|
|
from modules.datamodels.datamodelDocref import DocumentReferenceList
|
|
|
|
documentListParam = parameters.get("documentList")
|
|
# Convert to DocumentReferenceList if needed
|
|
if documentListParam is None:
|
|
documentList = DocumentReferenceList(references=[])
|
|
logger.debug(f"ai.process: documentList is None, using empty DocumentReferenceList")
|
|
elif isinstance(documentListParam, DocumentReferenceList):
|
|
documentList = documentListParam
|
|
logger.info(f"ai.process: Received DocumentReferenceList with {len(documentList.references)} references")
|
|
for idx, ref in enumerate(documentList.references):
|
|
logger.info(f" Reference {idx + 1}: documentId={ref.documentId}, type={type(ref).__name__}")
|
|
elif isinstance(documentListParam, str):
|
|
documentList = DocumentReferenceList.from_string_list([documentListParam])
|
|
logger.info(f"ai.process: Converted string to DocumentReferenceList with {len(documentList.references)} references")
|
|
elif isinstance(documentListParam, list):
|
|
documentList = DocumentReferenceList.from_string_list(documentListParam)
|
|
logger.info(f"ai.process: Converted list to DocumentReferenceList with {len(documentList.references)} references")
|
|
else:
|
|
logger.error(f"Invalid documentList type: {type(documentListParam)}")
|
|
documentList = DocumentReferenceList(references=[])
|
|
|
|
# Optional: if omitted, formats determined from prompt. Default "txt" is validation fallback only.
|
|
resultType = parameters.get("resultType")
|
|
simpleMode = parameters.get("simpleMode", False)
|
|
|
|
if not aiPrompt:
|
|
logger.error(f"aiPrompt is missing or empty. Parameters: {parameters}")
|
|
return ActionResult.isFailure(
|
|
error="AI prompt is required"
|
|
)
|
|
|
|
# Handle optional resultType: if None, formats determined from prompt by AI
|
|
if resultType:
|
|
normalized_result_type = (str(resultType).strip().lstrip('.').lower() or "txt")
|
|
output_extension = f".{normalized_result_type}"
|
|
output_format = output_extension.replace('.', '') or 'txt'
|
|
logger.info(f"Using result type: {resultType} -> {output_extension}, simpleMode: {simpleMode}")
|
|
else:
|
|
# No format specified - AI will determine formats from prompt
|
|
normalized_result_type = None
|
|
output_extension = None
|
|
output_format = None
|
|
logger.debug("resultType not provided - formats will be determined from prompt by AI")
|
|
|
|
output_mime_type = "application/octet-stream" # Prefer service-provided mimeType when available
|
|
|
|
# Phase 7.3: Pass both documentList and contentParts to AI service
|
|
# (Extraction logic removed - handled by AI service)
|
|
contentParts: Optional[List[ContentPart]] = None
|
|
if "contentParts" in parameters:
|
|
contentPartsParam = parameters.get("contentParts")
|
|
if contentPartsParam:
|
|
if isinstance(contentPartsParam, list):
|
|
contentParts = contentPartsParam
|
|
elif hasattr(contentPartsParam, 'parts'):
|
|
# Extract from ContentExtracted if it's an ActionDocument
|
|
contentParts = contentPartsParam.parts
|
|
else:
|
|
logger.warning(f"Invalid contentParts type: {type(contentPartsParam)}, treating as empty")
|
|
contentParts = None
|
|
|
|
# Update progress - preparing AI call
|
|
self.services.chat.progressLogUpdate(operationId, 0.4, "Preparing AI call")
|
|
|
|
# Build output format for simple mode
|
|
output_format_for_call = output_extension.replace('.', '') if output_extension else (output_format or 'txt')
|
|
|
|
# Simple mode: fast path without document generation pipeline
|
|
if simpleMode:
|
|
# Update progress - calling AI (simple mode)
|
|
self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI (simple mode)")
|
|
|
|
# Extract context from documents if provided
|
|
context_text = ""
|
|
if documentList and len(documentList.references) > 0:
|
|
try:
|
|
# Get documents from workflow
|
|
documents = self.services.chat.getChatDocumentsFromDocumentList(documentList)
|
|
context_parts = []
|
|
for doc in documents:
|
|
if hasattr(doc, 'fileId') and doc.fileId:
|
|
# Get file data
|
|
fileData = self.services.interfaceDbComponent.getFileData(doc.fileId)
|
|
if fileData:
|
|
if isinstance(fileData, bytes):
|
|
doc_text = fileData.decode('utf-8', errors='ignore')
|
|
else:
|
|
doc_text = str(fileData)
|
|
context_parts.append(doc_text)
|
|
if context_parts:
|
|
context_text = "\n\n".join(context_parts)
|
|
except Exception as e:
|
|
logger.warning(f"Error extracting context from documents in simple mode: {e}")
|
|
|
|
# Use direct AI call without document generation pipeline
|
|
request = AiCallRequest(
|
|
prompt=aiPrompt,
|
|
context=context_text if context_text else None,
|
|
options=AiCallOptions(
|
|
resultFormat=output_format_for_call,
|
|
operationType=OperationTypeEnum.DATA_ANALYSE,
|
|
processingMode=ProcessingModeEnum.BASIC
|
|
)
|
|
)
|
|
|
|
aiResponse_obj = await self.services.ai.callAi(request)
|
|
|
|
# Convert AiCallResponse to AiResponse format
|
|
from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata
|
|
aiResponse = AiResponse(
|
|
content=aiResponse_obj.content,
|
|
metadata=AiResponseMetadata(
|
|
additionalData={
|
|
"modelName": aiResponse_obj.modelName,
|
|
"priceUsd": aiResponse_obj.priceUsd,
|
|
"processingTime": aiResponse_obj.processingTime,
|
|
"bytesSent": aiResponse_obj.bytesSent,
|
|
"bytesReceived": aiResponse_obj.bytesReceived,
|
|
"errorCount": aiResponse_obj.errorCount
|
|
}
|
|
),
|
|
documents=[] # Simple mode doesn't generate documents
|
|
)
|
|
else:
|
|
# Full mode: use unified callAiContent method
|
|
# Detect image generation from resultType (if provided)
|
|
imageFormats = ["png", "jpg", "jpeg", "gif", "webp"]
|
|
isImageGeneration = normalized_result_type in imageFormats if normalized_result_type else False
|
|
|
|
# Build options with correct operationType
|
|
# resultFormat in options can be None - formats will be determined by AI if not provided
|
|
options = AiCallOptions(
|
|
resultFormat=output_format, # Can be None - formats determined by AI
|
|
operationType=OperationTypeEnum.IMAGE_GENERATE if isImageGeneration else OperationTypeEnum.DATA_GENERATE
|
|
)
|
|
|
|
# Get generationIntent from parameters (required for DATA_GENERATE)
|
|
# Default to "document" if not provided (most common use case)
|
|
# For code generation, use ai.generateCode action or explicitly pass generationIntent="code"
|
|
generationIntent = parameters.get("generationIntent", "document")
|
|
|
|
# Update progress - calling AI
|
|
self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI")
|
|
|
|
# Use unified callAiContent method with BOTH documentList and contentParts
|
|
# Extraction is handled by AI service - no extraction here
|
|
# outputFormat: Optional - if None, formats determined from prompt by AI
|
|
# Note: ContentExtracted documents (from context.extractContent) are now handled
|
|
# automatically in _extractAndPrepareContent() (Phase 5B)
|
|
logger.info(f"ai.process: Calling callAiContent with {len(documentList.references)} document references")
|
|
if documentList.references:
|
|
from modules.datamodels.datamodelDocref import DocumentListReference, DocumentItemReference
|
|
for idx, ref in enumerate(documentList.references):
|
|
if isinstance(ref, DocumentItemReference):
|
|
logger.info(f" Passing reference {idx + 1}: documentId={ref.documentId}")
|
|
elif isinstance(ref, DocumentListReference):
|
|
logger.info(f" Passing reference {idx + 1}: label={ref.label}")
|
|
else:
|
|
logger.info(f" Passing reference {idx + 1}: {ref}")
|
|
|
|
aiResponse = await self.services.ai.callAiContent(
|
|
prompt=aiPrompt,
|
|
options=options,
|
|
documentList=documentList, # Pass documentList - AI service handles extraction
|
|
contentParts=contentParts, # Pass contentParts if provided (or None)
|
|
outputFormat=output_format, # Can be None - AI determines from prompt
|
|
parentOperationId=operationId,
|
|
generationIntent=generationIntent # REQUIRED for DATA_GENERATE
|
|
)
|
|
|
|
# Update progress - processing result
|
|
self.services.chat.progressLogUpdate(operationId, 0.8, "Processing result")
|
|
|
|
# Extract documents from AiResponse
|
|
if aiResponse.documents and len(aiResponse.documents) > 0:
|
|
action_documents = []
|
|
for doc in aiResponse.documents:
|
|
validationMetadata = {
|
|
"actionType": "ai.process",
|
|
"resultType": normalized_result_type,
|
|
"outputFormat": output_format,
|
|
"hasDocuments": True,
|
|
"documentCount": len(aiResponse.documents)
|
|
}
|
|
action_documents.append(ActionDocument(
|
|
documentName=doc.documentName,
|
|
documentData=doc.documentData,
|
|
mimeType=doc.mimeType or output_mime_type,
|
|
sourceJson=getattr(doc, 'sourceJson', None), # Preserve source JSON for structure validation
|
|
validationMetadata=validationMetadata
|
|
))
|
|
|
|
final_documents = action_documents
|
|
else:
|
|
# Text response - create document from content
|
|
# If no extension provided, use "txt" (required for filename)
|
|
extension = output_extension.lstrip('.') if output_extension else "txt"
|
|
meaningful_name = self._generateMeaningfulFileName(
|
|
base_name="ai",
|
|
extension=extension,
|
|
action_name="result"
|
|
)
|
|
validationMetadata = {
|
|
"actionType": "ai.process",
|
|
"resultType": normalized_result_type if normalized_result_type else None,
|
|
"outputFormat": output_format if output_format else None,
|
|
"hasDocuments": False,
|
|
"contentType": "text"
|
|
}
|
|
action_document = ActionDocument(
|
|
documentName=meaningful_name,
|
|
documentData=aiResponse.content,
|
|
mimeType=output_mime_type,
|
|
validationMetadata=validationMetadata
|
|
)
|
|
final_documents = [action_document]
|
|
|
|
# Complete progress tracking
|
|
self.services.chat.progressLogFinish(operationId, True)
|
|
|
|
return ActionResult.isSuccess(documents=final_documents)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in AI processing: {str(e)}")
|
|
|
|
# Complete progress tracking with failure
|
|
try:
|
|
self.services.chat.progressLogFinish(operationId, False)
|
|
except:
|
|
pass # Don't fail on progress logging errors
|
|
|
|
return ActionResult.isFailure(
|
|
error=str(e)
|
|
)
|
|
|