gateway/modules/workflows/methods/methodAi.py
2025-09-30 18:30:33 +02:00

296 lines
15 KiB
Python

"""
AI processing method module.
Handles direct AI calls for any type of task.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelWorkflow import ActionResult
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
logger = logging.getLogger(__name__)
class MethodAi(MethodBase):
"""AI processing methods."""
def __init__(self, services):
super().__init__(services)
self.name = "ai"
self.description = "AI processing methods"
def _format_timestamp_for_filename(self) -> str:
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
@action
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Perform an AI call for any type of task with optional document references
Parameters:
aiPrompt (str): The AI prompt for processing
documentList (list, optional): List of document references to include in context
expectedDocumentFormat (str, optional): Expected document output format with extension, mimeType, description
processingMode (str, optional): Processing mode - use 'basic', 'advanced', or 'detailed' (defaults to 'basic')
includeMetadata (bool, optional): Whether to include metadata (default: True)
operationType (str, optional): Operation type - use 'general', 'generate_plan', 'analyse_content', 'generate_content', 'web_research', 'image_analysis', or 'image_generation'
priority (str, optional): Priority level - use 'speed', 'quality', 'cost', or 'balanced'
maxCost (float, optional): Maximum cost budget for the AI call
maxProcessingTime (int, optional): Maximum processing time in seconds
requiredTags (list, optional): Required model tags - use 'text', 'chat', 'reasoning', 'analysis', 'image', 'vision', 'web', 'search', etc.
"""
try:
aiPrompt = parameters.get("aiPrompt")
documentList = parameters.get("documentList", [])
if isinstance(documentList, str):
documentList = [documentList]
expectedDocumentFormat = parameters.get("expectedDocumentFormat", "")
processingMode = parameters.get("processingMode", "basic")
includeMetadata = parameters.get("includeMetadata", True)
operationType = parameters.get("operationType", "general")
priority = parameters.get("priority", "balanced")
maxCost = parameters.get("maxCost")
maxProcessingTime = parameters.get("maxProcessingTime")
requiredTags = parameters.get("requiredTags")
if not aiPrompt:
return ActionResult.isFailure(
error="AI prompt is required"
)
# Determine output format first (needed for context building)
output_extension = ".txt" # Default
output_mime_type = "text/plain" # Default
if expectedDocumentFormat:
output_extension = expected_format.get("extension", ".txt")
output_mime_type = expected_format.get("mimeType", "text/plain")
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
# Build context from documents if provided
context = ""
if documentList:
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
if chatDocuments:
context_parts = []
# Build batch payload for extraction
batch_docs = []
for doc in chatDocuments:
try:
fileBytes = self.services.workflow.getFileData(doc.fileId) if hasattr(doc, 'fileId') else None
except Exception:
fileBytes = None
batch_docs.append({
"id": getattr(doc, 'id', None),
"bytes": fileBytes or b"",
"fileName": getattr(doc, 'fileName', 'unknown'),
"mimeType": getattr(doc, 'mimeType', None) or "application/octet-stream"
})
extraction_prompt = (
f"Extract content for AI task context. Task: {aiPrompt}. Mode: {processingMode}."
)
try:
extracted_list = await self.services.extraction.extractContentFromDocuments(
prompt=extraction_prompt,
documents=batch_docs,
options={"ai": {"enabled": False}, "mergeStrategy": {}}
)
except Exception:
extracted_list = []
# Helper to aggregate readable text from parts
def _partsToText(parts) -> str:
lines: List[str] = []
for p in (parts or []):
try:
if getattr(p, 'typeGroup', '') in ("text", "table", "structure") and getattr(p, 'data', None):
lines.append(p.data)
except Exception:
continue
return "\n\n".join(lines)
for i, doc in enumerate(chatDocuments):
file_info = self.services.workflow.getFileInfo(doc.fileId)
content = ""
try:
ec = extracted_list[i] if i < len(extracted_list) else None
if ec:
content = _partsToText(getattr(ec, 'parts', []))
except Exception:
content = ""
if content.strip():
metadata_info = ""
if file_info and includeMetadata:
metadata_info = f" (Size: {file_info.get('fileSize', 'unknown')}, Type: {file_info.get('mimeType', 'unknown')})"
base_length = 5000 if processingMode == "detailed" else 3000 if processingMode == "advanced" else 2000
if processingMode == "detailed":
context_parts.append(
f"Document: {doc.fileName}{metadata_info}\nRelevance to AI Task: This document contains content directly related to '{aiPrompt[:100]}...'\nContent:\n{content[:base_length]}..."
)
else:
context_parts.append(
f"Document: {doc.fileName}{metadata_info}\nContent:\n{content[:base_length]}..."
)
else:
context_parts.append(f"Document: {doc.fileName} [No readable text content - binary file]")
if context_parts:
context_header = f"""
=== DOCUMENT CONTEXT FOR AI PROCESSING ===
AI Task: {aiPrompt[:100]}...
Processing Mode: {processingMode}
Expected Output Format: {output_extension.upper()}
Total Documents: {len(chatDocuments)}
The following documents contain content relevant to your task.
Use this information to provide the most accurate and helpful response.
================================================
"""
context = context_header + "\n\n" + "\n\n".join(context_parts)
logger.info(f"Included {len(chatDocuments)} documents in AI context with task-specific extraction")
# Build enhanced prompt
enhanced_prompt = aiPrompt
# Add processing mode instructions if specified (generic, not analysis-specific)
if processingMode == "detailed":
enhanced_prompt += "\n\nPlease provide a detailed response with comprehensive information."
elif processingMode == "advanced":
enhanced_prompt += "\n\nPlease provide an advanced response with deep insights."
# Add custom instructions if provided
if customInstructions:
enhanced_prompt += f"\n\nAdditional Instructions: {customInstructions}"
# Add format-specific instructions only if non-text format is requested
if output_extension != ".txt":
if output_extension == ".csv":
enhanced_prompt += f"\n\nCRITICAL: Deliver the result as pure CSV data without any markdown formatting, code blocks, or additional text. Output only the CSV content with proper headers and data rows."
elif output_extension == ".json":
enhanced_prompt += f"\n\nCRITICAL: Deliver the result as pure JSON data without any markdown formatting, code blocks, or additional text. Output only the JSON content."
elif output_extension == ".xml":
enhanced_prompt += f"\n\nCRITICAL: Deliver the result as pure XML data without any markdown formatting, code blocks, or additional text. Output only the XML content."
else:
enhanced_prompt += f"\n\nCRITICAL: Deliver the result as pure {output_extension.upper()} data without any markdown formatting, code blocks, or additional text."
# Call appropriate AI service based on processing mode
logger.info(f"Executing AI call with mode: {processingMode}, prompt length: {len(enhanced_prompt)}")
if context:
logger.info(f"Including context from {len(documentList)} documents")
# Encourage longer, structured outputs with a min-length hint
min_tokens_hint = "\n\nPlease ensure the response is substantial and complete."
call_prompt = enhanced_prompt + min_tokens_hint
# Centralized AI call with optional document context
documents = []
try:
if documentList:
for d in (chatDocuments or []):
try:
file_data = self.services.workflow.getFileData(d.fileId)
documents.append(
ChatDocument(
fileData=file_data,
fileName=d.fileName,
mimeType=d.mimeType
)
)
except Exception:
continue
except Exception:
documents = None
output_format = output_extension.replace('.', '') or 'txt'
# Build options using new AiCallOptions format
options = AiCallOptions(
operationType=operationType,
priority=priority,
compressPrompt=processingMode != "detailed",
compressContext=True,
processDocumentsIndividually=True,
processingMode=processingMode,
resultFormat=output_format,
maxCost=maxCost,
maxProcessingTime=maxProcessingTime,
requiredTags=requiredTags
)
result = await self.services.ai.callAi(
prompt=call_prompt,
documents=documents or None,
options=options
)
# If expected JSON and too short/not JSON, retry with stricter JSON guardrails
if output_extension == ".json":
import json
cleaned = (result or "").strip()
if cleaned.startswith('```json'):
cleaned = cleaned[7:]
if cleaned.endswith('```'):
cleaned = cleaned[:-3]
cleaned = cleaned.strip()
needs_retry = False
try:
parsed = json.loads(cleaned)
# Heuristic: small dict -> possibly underfilled
if isinstance(parsed, dict) and len(parsed.keys()) <= 2:
needs_retry = True
except Exception:
needs_retry = True
if needs_retry:
guardrail_prompt = (
enhanced_prompt
+ "\n\nCRITICAL: Return ONLY valid JSON, no markdown, no code fences. "
"Include all requested fields with detailed content."
)
try:
result = await self.services.ai.callAi(
prompt=guardrail_prompt,
documents=context or None,
options=AiCallOptions(
operationType=OperationType.GENERATE_CONTENT,
priority=Priority.QUALITY,
compressPrompt=False,
compressContext=True,
processDocumentsIndividually=True,
processingMode="detailed",
resultFormat="json",
maxCost=0.03,
maxProcessingTime=30
)
)
except Exception:
result = cleaned # fallback to first attempt
# Create result document
fileName = f"ai_{processingMode}_{self._format_timestamp_for_filename()}{output_extension}"
# Return result in the standard ActionResult format
return ActionResult.isSuccess(
documents=[{
"documentName": fileName,
"documentData": {
"result": result,
"fileName": fileName,
"processedDocuments": len(documentList) if documentList else 0
},
"mimeType": output_mime_type
}]
)
except Exception as e:
logger.error(f"Error in AI processing: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)