233 lines
10 KiB
Python
233 lines
10 KiB
Python
"""
|
|
AI processing method module.
|
|
Handles direct AI calls for any type of task.
|
|
"""
|
|
|
|
import time
|
|
import logging
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime, UTC
|
|
|
|
from modules.workflows.methods.methodBase import MethodBase, action
|
|
from modules.datamodels.datamodelChat import ActionResult
|
|
from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum, AiCallPromptImage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class MethodAi(MethodBase):
|
|
"""AI processing methods."""
|
|
|
|
def __init__(self, services):
|
|
super().__init__(services)
|
|
self.name = "ai"
|
|
self.description = "AI processing methods"
|
|
|
|
def _format_timestamp_for_filename(self) -> str:
|
|
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
|
|
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
|
|
|
|
|
|
@action
|
|
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
GENERAL:
|
|
- Purpose: Process a user prompt with optional unlimited input documents to produce one or many output documents of the SAME format.
|
|
- Input requirements: aiPrompt (required); optional documentList.
|
|
- Output format: Exactly one file format to select. For multiple output file formats you need to do different calls.
|
|
|
|
Parameters:
|
|
- aiPrompt (str, required): Instruction for the AI.
|
|
- documentList (list, optional): Document reference(s) for context.
|
|
- resultType (str, optional): Output file extension - only one extension allowed (e.g. txt, json, md, csv, xml, html, pdf, docx, xlsx, png, ...). Default: txt.
|
|
"""
|
|
try:
|
|
# Init progress logger
|
|
workflowId = self.services.currentWorkflow.id if self.services.currentWorkflow else f"no-workflow-{int(time.time())}"
|
|
operationId = f"ai_process_{workflowId}_{int(time.time())}"
|
|
|
|
# Start progress tracking
|
|
self.services.workflow.progressLogStart(
|
|
operationId,
|
|
"Generate",
|
|
"AI Processing",
|
|
f"Format: {parameters.get('resultType', 'txt')}"
|
|
)
|
|
|
|
aiPrompt = parameters.get("aiPrompt")
|
|
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
|
|
|
|
# Update progress - preparing parameters
|
|
self.services.workflow.progressLogUpdate(operationId, 0.2, "Preparing parameters")
|
|
|
|
documentList = parameters.get("documentList", [])
|
|
if isinstance(documentList, str):
|
|
documentList = [documentList]
|
|
resultType = parameters.get("resultType", "txt")
|
|
|
|
|
|
if not aiPrompt:
|
|
logger.error(f"aiPrompt is missing or empty. Parameters: {parameters}")
|
|
return ActionResult.isFailure(
|
|
error="AI prompt is required"
|
|
)
|
|
|
|
# Determine output extension and default MIME type without duplicating service logic
|
|
normalized_result_type = (str(resultType).strip().lstrip('.').lower() or "txt")
|
|
output_extension = f".{normalized_result_type}"
|
|
output_mime_type = "application/octet-stream" # Prefer service-provided mimeType when available
|
|
logger.info(f"Using result type: {resultType} -> {output_extension}")
|
|
|
|
# Update progress - preparing documents
|
|
self.services.workflow.progressLogUpdate(operationId, 0.3, "Preparing documents")
|
|
|
|
# Get ChatDocuments for AI service - let AI service handle all document processing
|
|
chatDocuments = []
|
|
if documentList:
|
|
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
|
|
if chatDocuments:
|
|
logger.info(f"Prepared {len(chatDocuments)} documents for AI processing")
|
|
|
|
# Update progress - preparing AI call
|
|
self.services.workflow.progressLogUpdate(operationId, 0.4, "Preparing AI call")
|
|
|
|
# Build options with only resultFormat - let service layer handle all other parameters
|
|
output_format = output_extension.replace('.', '') or 'txt'
|
|
options = AiCallOptions(
|
|
resultFormat=output_format
|
|
# Removed all model parameters - service layer will analyze prompt and determine optimal parameters
|
|
)
|
|
|
|
# Update progress - calling AI
|
|
self.services.workflow.progressLogUpdate(operationId, 0.6, "Calling AI")
|
|
|
|
result = await self.services.ai.callAiDocuments(
|
|
prompt=aiPrompt,
|
|
documents=chatDocuments if chatDocuments else None,
|
|
options=options,
|
|
outputFormat=output_format
|
|
)
|
|
|
|
# Update progress - processing result
|
|
self.services.workflow.progressLogUpdate(operationId, 0.8, "Processing result")
|
|
|
|
from modules.datamodels.datamodelChat import ActionDocument
|
|
|
|
if isinstance(result, dict) and isinstance(result.get("documents"), list):
|
|
action_documents = []
|
|
for d in result["documents"]:
|
|
action_documents.append(ActionDocument(
|
|
documentName=d.get("documentName"),
|
|
documentData=d.get("documentData"),
|
|
mimeType=d.get("mimeType") or output_mime_type
|
|
))
|
|
|
|
# Preserve structured content field for validation (if it exists)
|
|
# This allows validator to see the actual structured data, not just rendered output
|
|
if "content" in result and result["content"] and isinstance(result["content"], (dict, list)):
|
|
action_documents.append(ActionDocument(
|
|
documentName="structured_content.json",
|
|
documentData=result["content"],
|
|
mimeType="application/json"
|
|
))
|
|
|
|
final_documents = action_documents
|
|
else:
|
|
extension = output_extension.lstrip('.')
|
|
meaningful_name = self._generateMeaningfulFileName(
|
|
base_name="ai",
|
|
extension=extension,
|
|
action_name="result"
|
|
)
|
|
action_document = ActionDocument(
|
|
documentName=meaningful_name,
|
|
documentData=result,
|
|
mimeType=output_mime_type
|
|
)
|
|
final_documents = [action_document]
|
|
|
|
# Complete progress tracking
|
|
self.services.workflow.progressLogFinish(operationId, True)
|
|
|
|
return ActionResult.isSuccess(documents=final_documents)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in AI processing: {str(e)}")
|
|
|
|
# Complete progress tracking with failure
|
|
try:
|
|
self.services.workflow.progressLogFinish(operationId, False)
|
|
except:
|
|
pass # Don't fail on progress logging errors
|
|
|
|
return ActionResult.isFailure(
|
|
error=str(e)
|
|
)
|
|
|
|
|
|
@action
|
|
async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult:
|
|
"""
|
|
GENERAL:
|
|
- Purpose: Web research with two-step process: search for URLs, then crawl content.
|
|
- Input requirements: prompt (required); optional list(url), country, language, researchDepth.
|
|
- Output format: JSON with research results including URLs and content.
|
|
|
|
Parameters:
|
|
- prompt (str, required): Natural language research instruction.
|
|
- list(url) (list, optional): Specific URLs to crawl, if needed.
|
|
- country (str, optional): Two-digit country code (lowercase, e.g., ch, us, de).
|
|
- language (str, optional): Language code (lowercase, e.g., de, en, fr).
|
|
- researchDepth (str, optional): Research depth - fast, general, or deep. Default: general.
|
|
"""
|
|
try:
|
|
prompt = parameters.get("prompt")
|
|
if not prompt:
|
|
return ActionResult.isFailure(error="Research prompt is required")
|
|
|
|
# Init progress logger
|
|
operationId = f"web_research_{self.services.currentWorkflow.id}_{int(time.time())}"
|
|
|
|
# Start progress tracking
|
|
self.services.workflow.progressLogStart(
|
|
operationId,
|
|
"Web Research",
|
|
"Searching and Crawling",
|
|
"Extracting URLs and Content"
|
|
)
|
|
|
|
# Call webcrawl service - service handles all AI intention analysis and processing
|
|
result = await self.services.web.performWebResearch(
|
|
prompt=prompt,
|
|
urls=parameters.get("list(url)", []),
|
|
country=parameters.get("country"),
|
|
language=parameters.get("language"),
|
|
researchDepth=parameters.get("researchDepth", "general"),
|
|
operationId=operationId
|
|
)
|
|
|
|
# Complete progress tracking
|
|
self.services.workflow.progressLogFinish(operationId, True)
|
|
|
|
# Create meaningful filename
|
|
meaningfulName = self._generateMeaningfulFileName(
|
|
base_name="web_research",
|
|
extension="json",
|
|
action_name="research"
|
|
)
|
|
|
|
from modules.datamodels.datamodelChat import ActionDocument
|
|
actionDocument = ActionDocument(
|
|
documentName=meaningfulName,
|
|
documentData=result,
|
|
mimeType="application/json"
|
|
)
|
|
|
|
return ActionResult.isSuccess(documents=[actionDocument])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in web research: {str(e)}")
|
|
try:
|
|
self.services.workflow.progressLogFinish(operationId, False)
|
|
except:
|
|
pass
|
|
return ActionResult.isFailure(error=str(e))
|