355 lines
18 KiB
Python
355 lines
18 KiB
Python
# Copyright (c) 2025 Patrick Motsch
|
||
# All rights reserved.
|
||
|
||
import base64
|
||
import logging
|
||
import time
|
||
import json
|
||
import uuid
|
||
from typing import Dict, Any, List, Optional
|
||
from modules.datamodels.datamodelChat import ActionResult, ActionDocument
|
||
from modules.datamodels.datamodelAi import AiCallRequest, AiCallOptions, OperationTypeEnum, ProcessingModeEnum
|
||
from modules.datamodels.datamodelExtraction import ContentPart
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _is_action_document_like(obj: Any) -> bool:
|
||
"""Check if object is ActionDocument-like (has documentData for inline workflow documents)."""
|
||
if obj is None:
|
||
return False
|
||
data = None
|
||
if isinstance(obj, dict):
|
||
data = obj.get("documentData") or obj.get("document_data")
|
||
else:
|
||
data = getattr(obj, "documentData", None) or getattr(obj, "document_data", None)
|
||
if data is None:
|
||
return False
|
||
if isinstance(data, bytes):
|
||
return len(data) > 0
|
||
if isinstance(data, str):
|
||
return len(data.strip()) > 0
|
||
return True
|
||
|
||
|
||
def _action_docs_to_content_parts(services, docs: List[Any]) -> List[ContentPart]:
|
||
"""Extract content from ActionDocument-like objects in memory (no persistence).
|
||
Decodes base64, runs extraction pipeline, returns ContentParts for AI.
|
||
"""
|
||
from modules.datamodels.datamodelExtraction import ExtractionOptions, MergeStrategy
|
||
|
||
all_parts = []
|
||
extraction = getattr(services, "extraction", None)
|
||
if not extraction:
|
||
logger.warning("ai.process: No extraction service - cannot extract from inline documents")
|
||
return []
|
||
opts = ExtractionOptions(prompt="", mergeStrategy=MergeStrategy())
|
||
for i, doc in enumerate(docs):
|
||
raw = (doc.get("documentData") or doc.get("document_data")) if isinstance(doc, dict) else (getattr(doc, "documentData", None) or getattr(doc, "document_data", None))
|
||
if not raw:
|
||
continue
|
||
name = doc.get("documentName", doc.get("fileName", f"document_{i}"))
|
||
mime = doc.get("mimeType", "application/octet-stream")
|
||
if isinstance(raw, str):
|
||
try:
|
||
content = base64.b64decode(raw, validate=True)
|
||
except Exception:
|
||
content = raw.encode("utf-8")
|
||
else:
|
||
content = raw if isinstance(raw, bytes) else bytes(raw)
|
||
ec = extraction.extractContentFromBytes(
|
||
documentBytes=content,
|
||
fileName=name,
|
||
mimeType=mime,
|
||
documentId=str(uuid.uuid4()),
|
||
options=opts,
|
||
)
|
||
for p in ec.parts:
|
||
if p.data or getattr(p, "typeGroup", "") == "image":
|
||
p.metadata.setdefault("originalFileName", name)
|
||
all_parts.append(p)
|
||
logger.info(f"ai.process: Extracted {len(ec.parts)} parts from {name} (no persistence)")
|
||
return all_parts
|
||
|
||
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
|
||
operationId = None
|
||
try:
|
||
workflowId = self.services.workflow.id if self.services.workflow else f"no-workflow-{int(time.time())}"
|
||
operationId = f"ai_process_{workflowId}_{int(time.time())}"
|
||
|
||
# Start progress tracking
|
||
parentOperationId = parameters.get('parentOperationId')
|
||
if not parentOperationId:
|
||
logger.warning(f"ai.process: No parentOperationId provided in parameters. Operation '{operationId}' will appear at root level. Available parameters: {list(parameters.keys())}")
|
||
else:
|
||
logger.debug(f"ai.process: Using parentOperationId '{parentOperationId}' for operation '{operationId}'")
|
||
|
||
self.services.chat.progressLogStart(
|
||
operationId,
|
||
"Generate",
|
||
"AI Processing",
|
||
f"Format: {parameters.get('resultType', 'txt')}",
|
||
parentOperationId=parentOperationId
|
||
)
|
||
|
||
aiPrompt = parameters.get("aiPrompt")
|
||
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
|
||
|
||
# Update progress - preparing parameters
|
||
self.services.chat.progressLogUpdate(operationId, 0.2, "Preparing parameters")
|
||
|
||
from modules.datamodels.datamodelDocref import DocumentReferenceList
|
||
|
||
documentListParam = parameters.get("documentList")
|
||
inline_content_parts: Optional[List[ContentPart]] = None
|
||
|
||
# Handle inline ActionDocuments (e.g. from SharePoint/email in automation2 – no persistence)
|
||
is_inline = (
|
||
isinstance(documentListParam, list)
|
||
and len(documentListParam) > 0
|
||
and _is_action_document_like(documentListParam[0])
|
||
)
|
||
if is_inline:
|
||
inline_content_parts = _action_docs_to_content_parts(self.services, documentListParam)
|
||
documentList = DocumentReferenceList(references=[])
|
||
logger.info(
|
||
f"ai.process: Extracted {len(inline_content_parts)} ContentParts from {len(documentListParam)} inline ActionDocuments (no persistence)"
|
||
)
|
||
elif documentListParam is None:
|
||
documentList = DocumentReferenceList(references=[])
|
||
logger.debug(f"ai.process: documentList is None, using empty DocumentReferenceList")
|
||
elif isinstance(documentListParam, DocumentReferenceList):
|
||
documentList = documentListParam
|
||
logger.info(f"ai.process: Received DocumentReferenceList with {len(documentList.references)} references")
|
||
for idx, ref in enumerate(documentList.references):
|
||
logger.info(f" Reference {idx + 1}: documentId={ref.documentId}, type={type(ref).__name__}")
|
||
elif isinstance(documentListParam, str):
|
||
documentList = DocumentReferenceList.from_string_list([documentListParam])
|
||
logger.info(f"ai.process: Converted string to DocumentReferenceList with {len(documentList.references)} references")
|
||
elif isinstance(documentListParam, list):
|
||
first = documentListParam[0] if documentListParam else None
|
||
logger.info(
|
||
f"ai.process: documentList is list of {len(documentListParam)} items, "
|
||
f"first type={type(first).__name__}, has_documentData={_is_action_document_like(first) if first else False}"
|
||
)
|
||
documentList = DocumentReferenceList.from_string_list(documentListParam)
|
||
logger.info(f"ai.process: Converted list to DocumentReferenceList with {len(documentList.references)} references")
|
||
else:
|
||
logger.error(f"Invalid documentList type: {type(documentListParam)}")
|
||
documentList = DocumentReferenceList(references=[])
|
||
|
||
# Optional: if omitted, formats determined from prompt. Default "txt" is validation fallback only.
|
||
resultType = parameters.get("resultType")
|
||
simpleMode = parameters.get("simpleMode", False)
|
||
|
||
if not aiPrompt:
|
||
param_keys = list(parameters.keys()) if isinstance(parameters, dict) else []
|
||
doc_count = len(parameters.get("documentList") or []) if isinstance(parameters.get("documentList"), (list, tuple)) else 0
|
||
logger.error(f"aiPrompt is missing or empty. Parameter keys: {param_keys}, documentList: {doc_count} item(s)")
|
||
return ActionResult.isFailure(
|
||
error="AI prompt is required"
|
||
)
|
||
|
||
# Handle optional resultType: if None, formats determined from prompt by AI
|
||
if resultType:
|
||
normalized_result_type = (str(resultType).strip().lstrip('.').lower() or "txt")
|
||
output_extension = f".{normalized_result_type}"
|
||
output_format = output_extension.replace('.', '') or 'txt'
|
||
logger.info(f"Using result type: {resultType} -> {output_extension}, simpleMode: {simpleMode}")
|
||
else:
|
||
# No format specified - AI will determine formats from prompt
|
||
normalized_result_type = None
|
||
output_extension = None
|
||
output_format = None
|
||
logger.debug("resultType not provided - formats will be determined from prompt by AI")
|
||
|
||
mimeMap = {"txt": "text/plain", "json": "application/json", "html": "text/html", "md": "text/markdown", "csv": "text/csv", "xml": "application/xml"}
|
||
output_mime_type = mimeMap.get(normalized_result_type, "text/plain") if normalized_result_type else "text/plain"
|
||
|
||
# Phase 7.3: Pass documentList and/or contentParts to AI service
|
||
contentParts: Optional[List[ContentPart]] = inline_content_parts
|
||
if "contentParts" in parameters and not inline_content_parts:
|
||
contentPartsParam = parameters.get("contentParts")
|
||
if contentPartsParam:
|
||
if isinstance(contentPartsParam, list):
|
||
contentParts = contentPartsParam
|
||
elif hasattr(contentPartsParam, 'parts'):
|
||
# Extract from ContentExtracted if it's an ActionDocument
|
||
contentParts = contentPartsParam.parts
|
||
else:
|
||
logger.warning(f"Invalid contentParts type: {type(contentPartsParam)}, treating as empty")
|
||
contentParts = None
|
||
|
||
# Update progress - preparing AI call
|
||
self.services.chat.progressLogUpdate(operationId, 0.4, "Preparing AI call")
|
||
|
||
# Build output format for simple mode
|
||
output_format_for_call = output_extension.replace('.', '') if output_extension else (output_format or 'txt')
|
||
|
||
# Simple mode: fast path without document generation pipeline
|
||
if simpleMode:
|
||
self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI (simple mode)")
|
||
|
||
context_parts = []
|
||
paramContext = parameters.get("context")
|
||
if paramContext and isinstance(paramContext, str) and paramContext.strip():
|
||
context_parts.append(paramContext.strip())
|
||
if documentList and len(documentList.references) > 0:
|
||
try:
|
||
documents = self.services.chat.getChatDocumentsFromDocumentList(documentList)
|
||
for doc in documents:
|
||
if hasattr(doc, 'fileId') and doc.fileId:
|
||
fileData = self.services.interfaceDbComponent.getFileData(doc.fileId)
|
||
if fileData:
|
||
if isinstance(fileData, bytes):
|
||
doc_text = fileData.decode('utf-8', errors='ignore')
|
||
else:
|
||
doc_text = str(fileData)
|
||
context_parts.append(doc_text)
|
||
except Exception as e:
|
||
logger.warning(f"Error extracting context from documents in simple mode: {e}")
|
||
context_text = "\n\n".join(context_parts) if context_parts else ""
|
||
|
||
request = AiCallRequest(
|
||
prompt=aiPrompt,
|
||
context=context_text if context_text else None,
|
||
options=AiCallOptions(
|
||
resultFormat=output_format_for_call,
|
||
operationType=OperationTypeEnum.DATA_ANALYSE,
|
||
processingMode=ProcessingModeEnum.BASIC
|
||
)
|
||
)
|
||
|
||
aiResponse_obj = await self.services.ai.callAi(request)
|
||
|
||
# Convert AiCallResponse to AiResponse format
|
||
from modules.datamodels.datamodelWorkflow import AiResponse, AiResponseMetadata
|
||
aiResponse = AiResponse(
|
||
content=aiResponse_obj.content,
|
||
metadata=AiResponseMetadata(
|
||
additionalData={
|
||
"modelName": aiResponse_obj.modelName,
|
||
"priceCHF": aiResponse_obj.priceCHF,
|
||
"processingTime": aiResponse_obj.processingTime,
|
||
"bytesSent": aiResponse_obj.bytesSent,
|
||
"bytesReceived": aiResponse_obj.bytesReceived,
|
||
"errorCount": aiResponse_obj.errorCount
|
||
}
|
||
),
|
||
documents=[] # Simple mode doesn't generate documents
|
||
)
|
||
else:
|
||
# Full mode: use unified callAiContent method
|
||
# Detect image generation from resultType (if provided)
|
||
imageFormats = ["png", "jpg", "jpeg", "gif", "webp"]
|
||
isImageGeneration = normalized_result_type in imageFormats if normalized_result_type else False
|
||
|
||
# Build options with correct operationType
|
||
# resultFormat in options can be None - formats will be determined by AI if not provided
|
||
options = AiCallOptions(
|
||
resultFormat=output_format, # Can be None - formats determined by AI
|
||
operationType=OperationTypeEnum.IMAGE_GENERATE if isImageGeneration else OperationTypeEnum.DATA_GENERATE
|
||
)
|
||
|
||
# Get generationIntent from parameters (required for DATA_GENERATE)
|
||
# Default to "document" if not provided (most common use case)
|
||
# For code generation, use ai.generateCode action or explicitly pass generationIntent="code"
|
||
generationIntent = parameters.get("generationIntent", "document")
|
||
|
||
paramContext = parameters.get("context")
|
||
if paramContext and isinstance(paramContext, str) and paramContext.strip():
|
||
aiPrompt = f"{aiPrompt}\n\n--- DATA CONTEXT ---\n{paramContext.strip()}"
|
||
|
||
self.services.chat.progressLogUpdate(operationId, 0.6, "Calling AI")
|
||
|
||
# Use unified callAiContent method with BOTH documentList and contentParts
|
||
# Extraction is handled by AI service - no extraction here
|
||
# outputFormat: Optional - if None, formats determined from prompt by AI
|
||
# Note: ContentExtracted documents (from context.extractContent) are now handled
|
||
# automatically in _extractAndPrepareContent() (Phase 5B)
|
||
logger.info(f"ai.process: Calling callAiContent with {len(documentList.references)} document references")
|
||
if documentList.references:
|
||
from modules.datamodels.datamodelDocref import DocumentListReference, DocumentItemReference
|
||
for idx, ref in enumerate(documentList.references):
|
||
if isinstance(ref, DocumentItemReference):
|
||
logger.info(f" Passing reference {idx + 1}: documentId={ref.documentId}")
|
||
elif isinstance(ref, DocumentListReference):
|
||
logger.info(f" Passing reference {idx + 1}: label={ref.label}")
|
||
else:
|
||
logger.info(f" Passing reference {idx + 1}: {ref}")
|
||
|
||
aiResponse = await self.services.ai.callAiContent(
|
||
prompt=aiPrompt,
|
||
options=options,
|
||
documentList=documentList if not inline_content_parts else None, # Skip if using inline extracted parts
|
||
contentParts=contentParts, # From inline ActionDocuments (extracted in memory) or parameters
|
||
outputFormat=output_format, # Can be None - AI determines from prompt
|
||
parentOperationId=operationId,
|
||
generationIntent=generationIntent # REQUIRED for DATA_GENERATE
|
||
)
|
||
|
||
# Update progress - processing result
|
||
self.services.chat.progressLogUpdate(operationId, 0.8, "Processing result")
|
||
|
||
# Extract documents from AiResponse
|
||
if aiResponse.documents and len(aiResponse.documents) > 0:
|
||
action_documents = []
|
||
for doc in aiResponse.documents:
|
||
validationMetadata = {
|
||
"actionType": "ai.process",
|
||
"resultType": normalized_result_type,
|
||
"outputFormat": output_format,
|
||
"hasDocuments": True,
|
||
"documentCount": len(aiResponse.documents)
|
||
}
|
||
action_documents.append(ActionDocument(
|
||
documentName=doc.documentName,
|
||
documentData=doc.documentData,
|
||
mimeType=doc.mimeType or output_mime_type,
|
||
sourceJson=getattr(doc, 'sourceJson', None), # Preserve source JSON for structure validation
|
||
validationMetadata=validationMetadata
|
||
))
|
||
|
||
final_documents = action_documents
|
||
else:
|
||
# Text response - create document from content
|
||
# If no extension provided, use "txt" (required for filename)
|
||
extension = output_extension.lstrip('.') if output_extension else "txt"
|
||
meaningful_name = self._generateMeaningfulFileName(
|
||
base_name="ai",
|
||
extension=extension,
|
||
action_name="result"
|
||
)
|
||
validationMetadata = {
|
||
"actionType": "ai.process",
|
||
"resultType": normalized_result_type if normalized_result_type else None,
|
||
"outputFormat": output_format if output_format else None,
|
||
"hasDocuments": False,
|
||
"contentType": "text"
|
||
}
|
||
action_document = ActionDocument(
|
||
documentName=meaningful_name,
|
||
documentData=aiResponse.content,
|
||
mimeType=output_mime_type,
|
||
validationMetadata=validationMetadata
|
||
)
|
||
final_documents = [action_document]
|
||
|
||
# Complete progress tracking
|
||
self.services.chat.progressLogFinish(operationId, True)
|
||
|
||
return ActionResult.isSuccess(documents=final_documents)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error in AI processing: {str(e)}")
|
||
|
||
try:
|
||
if operationId:
|
||
self.services.chat.progressLogFinish(operationId, False)
|
||
except Exception:
|
||
pass
|
||
|
||
return ActionResult.isFailure(
|
||
error=str(e)
|
||
)
|
||
|