gateway/modules/workflows/methods/methodAi/actions/process.py
2026-04-29 23:12:46 +02:00

367 lines
18 KiB
Python

# Copyright (c) 2025 Patrick Motsch
# All rights reserved.
import base64
import logging
import time
import json
import uuid
from typing import Dict, Any, List, Optional
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
from modules.datamodels.datamodelExtraction import ContentPart
from modules.serviceCenter.services.serviceSubscription.mainServiceSubscription import SubscriptionInactiveException
from modules.serviceCenter.services.serviceBilling.mainServiceBilling import BillingContextError
logger = logging.getLogger(__name__)
def _is_action_document_like(obj: Any) -> bool:
"""Check if object is ActionDocument-like (has documentData for inline workflow documents)."""
if obj is None:
return False
data = None
if isinstance(obj, dict):
data = obj.get("documentData") or obj.get("document_data")
else:
data = getattr(obj, "documentData", None) or getattr(obj, "document_data", None)
if data is None:
return False
if isinstance(data, bytes):
return len(data) > 0
if isinstance(data, str):
return len(data.strip()) > 0
return True
def _action_docs_to_content_parts(services, docs: List[Any]) -> List[ContentPart]:
"""Extract content from ActionDocument-like objects in memory (no persistence).
Decodes base64, runs extraction pipeline, returns ContentParts for AI.
"""
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
all_parts = []
extraction = getattr(services, "extraction", None)
if not extraction:
logger.warning("ai.process: No extraction service - cannot extract from inline documents")
return []
opts = ExtractionOptions(prompt="", mergeStrategy=MergeStrategy())
for i, doc in enumerate(docs):
raw = (doc.get("documentData") or doc.get("document_data")) if isinstance(doc, dict) else (getattr(doc, "documentData", None) or getattr(doc, "document_data", None))
if not raw:
continue
name = doc.get("documentName", doc.get("fileName", f"document_{i}"))
mime = doc.get("mimeType", "application/octet-stream")
if isinstance(raw, str):
try:
content = base64.b64decode(raw, validate=True)
except Exception:
content = raw.encode("utf-8")
else:
content = raw if isinstance(raw, bytes) else bytes(raw)
ec = extraction.extractContentFromBytes(
documentBytes=content,
fileName=name,
mimeType=mime,
documentId=str(uuid.uuid4()),
options=opts,
)
for p in ec.parts:
if p.data or getattr(p, "typeGroup", "") == "image":
p.metadata.setdefault("originalFileName", name)
all_parts.append(p)
logger.info(f"ai.process: Extracted {len(ec.parts)} parts from {name} (no persistence)")
return all_parts
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
operationId = None
try:
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
operationId = f"ai_process_{workflowId}_{int(time.time())}"
# Start progress tracking
parentOperationId = parameters.get('parentOperationId')
if not parentOperationId:
logger.warning(f"ai.process: No parentOperationId provided in parameters. Operation '{operationId}' will appear at root level. Available parameters: {list(parameters.keys())}")
else:
logger.debug(f"ai.process: Using parentOperationId '{parentOperationId}' for operation '{operationId}'")
self.services.chat.progressLogStart(
operationId,
"Generate",
"AI Processing",
f"Format: {parameters.get('resultType', 'txt')}",
parentOperationId=parentOperationId
)
aiPrompt = parameters.get("aiPrompt")
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
# Update progress - preparing parameters
self.services.chat.progressLogUpdate(operationId, 0.2, "Preparing parameters")
from modules.datamodels.datamodelDocref import (
DocumentReferenceList,
coerceDocumentReferenceList,
)
documentListParam = parameters.get("documentList")
inline_content_parts: Optional[List[ContentPart]] = None
# Inline ActionDocuments (SharePoint/email in automation2, no
# persistence) are list[ActionDocument-like dict] -- handled
# separately because they carry pre-extracted content. Everything
# else is normalised through the tolerant coercer.
is_inline = (
isinstance(documentListParam, list)
and len(documentListParam) > 0
and _is_action_document_like(documentListParam[0])
)
if is_inline:
inline_content_parts = _action_docs_to_content_parts(self.services, documentListParam)
documentList = DocumentReferenceList(references=[])
logger.info(
f"ai.process: Extracted {len(inline_content_parts)} ContentParts from {len(documentListParam)} inline ActionDocuments (no persistence)"
)
else:
documentList = coerceDocumentReferenceList(documentListParam)
logger.info(
f"ai.process: Coerced documentList ({type(documentListParam).__name__}) "
f"to DocumentReferenceList with {len(documentList.references)} references"
)
# Optional: if omitted, formats determined from prompt. Default "txt" is validation fallback only.
resultType = parameters.get("resultType")
simpleMode = parameters.get("simpleMode", False)
if not aiPrompt:
param_keys = list(parameters.keys()) if isinstance(parameters, dict) else []
doc_count = len(parameters.get("documentList") or []) if isinstance(parameters.get("documentList"), (list, tuple)) else 0
logger.error(f"aiPrompt is missing or empty. Parameter keys: {param_keys}, documentList: {doc_count} item(s)")
return ActionResult.isFailure(
error="AI prompt is required"
)
# Handle optional resultType: if None, formats determined from prompt by AI
if resultType:
normalized_result_type = (str(resultType).strip().lstrip('.').lower() or "txt")
output_extension = f".{normalized_result_type}"
output_format = output_extension.replace('.', '') or 'txt'
logger.info(f"Using result type: {resultType} -> {output_extension}, simpleMode: {simpleMode}")
else:
# No format specified - AI will determine formats from prompt
normalized_result_type = None
output_extension = None
output_format = None
logger.debug("resultType not provided - formats will be determined from prompt by AI")
mimeMap = {"txt": "text/plain", "json": "application/json", "html": "text/html", "md": "text/markdown", "csv": "text/csv", "xml": "application/xml"}
output_mime_type = mimeMap.get(normalized_result_type, "text/plain") if normalized_result_type else "text/plain"
# Phase 7.3: Pass documentList and/or contentParts to AI service
contentParts: Optional[List[ContentPart]] = inline_content_parts
if "contentParts" in parameters and not inline_content_parts:
contentPartsParam = parameters.get("contentParts")
if contentPartsParam:
if isinstance(contentPartsParam, list):
contentParts = contentPartsParam
elif hasattr(contentPartsParam, 'parts'):
# Extract from ContentExtracted if it's an ActionDocument
contentParts = contentPartsParam.parts
else:
logger.warning(f"Invalid contentParts type: {type(contentPartsParam)}, treating as empty")
contentParts = None
# Update progress - preparing AI call
self.services.chat.progressLogUpdate(operationId, 0.4, "Preparing AI call")
# Build output format for simple mode
output_format_for_call = output_extension.replace('.', '') if output_extension else (output_format or 'txt')
# Simple mode: fast path without document generation pipeline
if simpleMode:
self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI (simple mode)")
context_parts = []
paramContext = parameters.get("context")
if paramContext and isinstance(paramContext, str) and paramContext.strip():
context_parts.append(paramContext.strip())
if documentList and len(documentList.references) > 0:
try:
documents = self.services.chat.getChatDocumentsFromDocumentList(documentList)
for doc in documents:
if hasattr(doc, 'fileId') and doc.fileId:
fileData = self.services.interfaceDbComponent.getFileData(doc.fileId)
if fileData:
if isinstance(fileData, bytes):
doc_text = fileData.decode('utf-8', errors='ignore')
else:
doc_text = str(fileData)
context_parts.append(doc_text)
except Exception as e:
logger.warning(f"Error extracting context from documents in simple mode: {e}")
context_text = "\n\n".join(context_parts) if context_parts else ""
request = AiCallRequest(
prompt=aiPrompt,
context=context_text if context_text else None,
options=AiCallOptions(
resultFormat=output_format_for_call,
operationType=OperationTypeEnum.DATA_ANALYSE,
processingMode=ProcessingModeEnum.BASIC
)
)
from modules.workflows.methods.methodAi._common import applyCommonAiParams
applyCommonAiParams(parameters, request)
aiResponse_obj = await self.services.ai.callAi(request)
# Convert AiCallResponse to AiResponse format
from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata
aiResponse = AiResponse(
content=aiResponse_obj.content,
metadata=AiResponseMetadata(
additionalData={
"modelName": aiResponse_obj.modelName,
"priceCHF": aiResponse_obj.priceCHF,
"processingTime": aiResponse_obj.processingTime,
"bytesSent": aiResponse_obj.bytesSent,
"bytesReceived": aiResponse_obj.bytesReceived,
"errorCount": aiResponse_obj.errorCount
}
),
documents=[] # Simple mode doesn't generate documents
)
else:
# Full mode: use unified callAiContent method
# Detect image generation from resultType (if provided)
imageFormats = ["png", "jpg", "jpeg", "gif", "webp"]
isImageGeneration = normalized_result_type in imageFormats if normalized_result_type else False
# Build options with correct operationType
# resultFormat in options can be None - formats will be determined by AI if not provided
options = AiCallOptions(
resultFormat=output_format, # Can be None - formats determined by AI
operationType=OperationTypeEnum.IMAGE_GENERATE if isImageGeneration else OperationTypeEnum.DATA_GENERATE
)
# Apply node-level AI params (allowedModels, requireNeutralization)
allowedModels = parameters.get("allowedModels")
if allowedModels and isinstance(allowedModels, list):
options.allowedModels = allowedModels
requireNeutralization = parameters.get("requireNeutralization")
if requireNeutralization is not None:
_ctx = getattr(self.services, '_context', None)
if _ctx:
_ctx.requireNeutralization = bool(requireNeutralization)
# Get generationIntent from parameters (required for DATA_GENERATE)
# Default to "document" if not provided (most common use case)
# For code generation, use ai.generateCode action or explicitly pass generationIntent="code"
generationIntent = parameters.get("generationIntent", "document")
paramContext = parameters.get("context")
if paramContext and isinstance(paramContext, str) and paramContext.strip():
aiPrompt = f"{aiPrompt}\n\n--- DATA CONTEXT ---\n{paramContext.strip()}"
self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI")
# Use unified callAiContent method with BOTH documentList and contentParts
# Extraction is handled by AI service - no extraction here
# outputFormat: Optional - if None, formats determined from prompt by AI
# Note: ContentExtracted documents (from context.extractContent) are now handled
# automatically in _extractAndPrepareContent() (Phase 5B)
logger.info(f"ai.process: Calling callAiContent with {len(documentList.references)} document references")
if documentList.references:
from modules.datamodels.datamodelDocref import DocumentListReference, DocumentItemReference
for idx, ref in enumerate(documentList.references):
if isinstance(ref, DocumentItemReference):
logger.info(f" Passing reference {idx + 1}: documentId={ref.documentId}")
elif isinstance(ref, DocumentListReference):
logger.info(f" Passing reference {idx + 1}: label={ref.label}")
else:
logger.info(f" Passing reference {idx + 1}: {ref}")
aiResponse = await self.services.ai.callAiContent(
prompt=aiPrompt,
options=options,
documentList=documentList if not inline_content_parts else None, # Skip if using inline extracted parts
contentParts=contentParts, # From inline ActionDocuments (extracted in memory) or parameters
outputFormat=output_format, # Can be None - AI determines from prompt
parentOperationId=operationId,
generationIntent=generationIntent # REQUIRED for DATA_GENERATE
)
# Update progress - processing result
self.services.chat.progressLogUpdate(operationId, 0.8, "Processing result")
# Extract documents from AiResponse
if aiResponse.documents and len(aiResponse.documents) > 0:
action_documents = []
for doc in aiResponse.documents:
validationMetadata = {
"actionType": "ai.process",
"resultType": normalized_result_type,
"outputFormat": output_format,
"hasDocuments": True,
"documentCount": len(aiResponse.documents)
}
action_documents.append(ActionDocument(
documentName=doc.documentName,
documentData=doc.documentData,
mimeType=doc.mimeType or output_mime_type,
sourceJson=getattr(doc, 'sourceJson', None), # Preserve source JSON for structure validation
validationMetadata=validationMetadata
))
final_documents = action_documents
else:
# Text response - create document from content
# If no extension provided, use "txt" (required for filename)
extension = output_extension.lstrip('.') if output_extension else "txt"
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
validationMetadata = {
"actionType": "ai.process",
"resultType": normalized_result_type if normalized_result_type else None,
"outputFormat": output_format if output_format else None,
"hasDocuments": False,
"contentType": "text"
}
action_document = ActionDocument(
documentName=meaningful_name,
documentData=aiResponse.content,
mimeType=output_mime_type,
validationMetadata=validationMetadata
)
final_documents = [action_document]
# Complete progress tracking
self.services.chat.progressLogFinish(operationId, True)
return ActionResult.isSuccess(documents=final_documents)
except (SubscriptionInactiveException, BillingContextError):
try:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
except Exception:
pass
raise
except Exception as e:
logger.error(f"Error in AI processing: {str(e)}")
try:
if operationId:
self.services.chat.progressLogFinish(operationId, False)
except Exception:
pass
return ActionResult.isFailure(
error=str(e)
)