gateway/modules/workflows/methods/methodAi.py
2025-10-03 19:46:10 +02:00

367 lines
18 KiB
Python

"""
AI processing method module.
Handles direct AI calls for any type of task.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelWorkflow import ActionResult
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelWeb import WebResearchRequest, WebResearchOptions
logger = logging.getLogger(__name__)
class MethodAi(MethodBase):
"""AI processing methods."""
def __init__(self, services):
super().__init__(services)
self.name = "ai"
self.description = "AI processing methods"
def _format_timestamp_for_filename(self) -> str:
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
@action
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Perform a generic AI call with optional document references, producing plain text output
Parameters:
aiPrompt (str): The AI prompt for processing
documentList (list, optional): List of document references to include in context
expectedDocumentFormat (str, optional): Preferred output extension (string or dict). Note: This action only returns plain text content.
processingMode (str, optional): Processing mode - use 'basic', 'advanced', or 'detailed' (defaults to 'basic')
includeMetadata (bool, optional): Whether to include metadata (default: True)
operationType (str, optional): Operation type - use 'general', 'generate_plan', 'analyse_content', 'generate_content', 'web_research', 'image_analysis', or 'image_generation'
priority (str, optional): Priority level - use 'speed', 'quality', 'cost', or 'balanced'
maxCost (float, optional): Maximum cost budget for the AI call
maxProcessingTime (int, optional): Maximum processing time in seconds
requiredTags (list, optional): Required model tags - use 'text', 'chat', 'reasoning', 'analysis', 'image', 'vision', 'web', 'search', etc.
"""
try:
aiPrompt = parameters.get("aiPrompt")
documentList = parameters.get("documentList", [])
if isinstance(documentList, str):
documentList = [documentList]
expectedDocumentFormat = parameters.get("expectedDocumentFormat", "")
processingMode = parameters.get("processingMode", "basic")
includeMetadata = parameters.get("includeMetadata", True)
operationType = parameters.get("operationType", "general")
priority = parameters.get("priority", "balanced")
maxCost = parameters.get("maxCost")
maxProcessingTime = parameters.get("maxProcessingTime")
requiredTags = parameters.get("requiredTags")
if not aiPrompt:
return ActionResult.isFailure(
error="AI prompt is required"
)
# Determine output format first (needed for context building)
output_extension = ".txt" # Default
output_mime_type = "text/plain" # Default
if expectedDocumentFormat:
if isinstance(expectedDocumentFormat, dict):
output_extension = expectedDocumentFormat.get("extension", ".txt")
output_mime_type = expectedDocumentFormat.get("mimeType", "text/plain")
else:
# If it's a string, treat it as the extension
output_extension = expectedDocumentFormat
output_mime_type = "text/plain"
logger.info(f"Using expected format: {output_extension} ({output_mime_type})")
# Get ChatDocuments for AI service - let AI service handle all document processing
chatDocuments = []
if documentList:
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
if chatDocuments:
logger.info(f"Prepared {len(chatDocuments)} documents for AI processing")
# Build enhanced prompt
enhanced_prompt = aiPrompt
# Add processing mode instructions if specified (generic, not analysis-specific)
if processingMode == "detailed":
enhanced_prompt += "\n\nPlease provide a detailed response with comprehensive information."
elif processingMode == "advanced":
enhanced_prompt += "\n\nPlease provide an advanced response with deep insights."
# Note: customInstructions parameter was removed as it's not defined in the method signature
# Add format guidance to prompt
if expectedDocumentFormat:
enhanced_prompt += f"\n\nPlease try to deliver the result in {output_extension.upper()} format. If you cannot deliver in that specific format, please use an appropriate alternative format and include a comment explaining the format used."
# Call AI service - it will handle all document processing internally
logger.info(f"Executing AI call with mode: {processingMode}, prompt length: {len(enhanced_prompt)}")
if chatDocuments:
logger.info(f"Including {len(chatDocuments)} documents for AI processing")
# Add JSON format instruction for structured response
json_instruction = """
Please return your response in the following JSON format:
{{
"documents": [
{{
"data": "your actual content here",
"mimeType": "appropriate/mime-type",
"comment": "optional comment about format or content"
}}
]
}}
If you need to return multiple documents, add more objects to the documents array. The data field should contain the actual content, mimeType should be appropriate for the content format, and comment is optional.
"""
call_prompt = enhanced_prompt + json_instruction
output_format = output_extension.replace('.', '') or 'txt'
# Build options using new AiCallOptions format
options = AiCallOptions(
operationType=operationType,
priority=priority,
compressPrompt=processingMode != "detailed",
compressContext=True,
processDocumentsIndividually=True,
processingMode=processingMode,
resultFormat=output_format,
maxCost=maxCost,
maxProcessingTime=maxProcessingTime,
requiredTags=requiredTags
)
# Use the new AI service that handles document processing internally
result = await self.services.ai.callAi(
prompt=call_prompt,
documents=chatDocuments if chatDocuments else None,
options=options
)
# DEBUG dump: write raw AI result to @testing_extraction/ TODO Remove
try:
import os
from datetime import datetime
debug_root = "../local/testing_extraction"
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
debug_dir = os.path.join(debug_root, f"method_ai_{ts}")
os.makedirs(debug_dir, exist_ok=True)
with open(os.path.join(debug_dir, "raw_result.txt"), "w", encoding="utf-8") as f:
f.write(str(result) if result is not None else "")
except Exception:
pass
# Parse JSON response from AI and create proper ActionDocument objects
import json
import re
from modules.datamodels.datamodelWorkflow import ActionDocument, ActionResult
action_documents = []
try:
# Clean up the response (remove markdown code blocks if present)
cleaned_result = (result or "").strip()
# Remove code fences anywhere in the text
cleaned_result = re.sub(r"```json|```", "", cleaned_result).strip()
# Try direct parse first
try:
parsed_response = json.loads(cleaned_result)
except Exception:
# Heuristic extraction: find the largest {...} block
start = cleaned_result.find("{")
end = cleaned_result.rfind("}")
if start != -1 and end != -1 and end > start:
candidate = cleaned_result[start:end+1]
# Remove trailing commas before closing braces/brackets
candidate = re.sub(r",\s*([}\]])", r"\1", candidate)
parsed_response = json.loads(candidate)
else:
# Try extracting a JSON code block via regex as last resort
match = re.search(r"\{[\s\S]*\}", cleaned_result)
if match:
candidate = re.sub(r",\s*([}\]])", r"\1", match.group(0))
parsed_response = json.loads(candidate)
else:
raise
# Extract documents from response
if isinstance(parsed_response, dict) and "documents" in parsed_response:
for doc in parsed_response["documents"]:
if isinstance(doc, dict):
# Generate meaningful file name with workflow context
extension = output_extension.lstrip('.') # Remove leading dot
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
action_documents.append(ActionDocument(
documentName=meaningful_name,
documentData=doc.get("data", ""),
mimeType=doc.get("mimeType", output_mime_type)
))
# If no documents found in JSON, create a single document from the raw result
if not action_documents:
extension = output_extension.lstrip('.') # Remove leading dot
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
action_documents.append(ActionDocument(
documentName=meaningful_name,
documentData=result,
mimeType=output_mime_type
))
except Exception as e:
# Fallback: create single document with raw result
logger.warning(f"Failed to parse AI response as JSON: {str(e)}")
extension = output_extension.lstrip('.') # Remove leading dot
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
action_documents.append(ActionDocument(
documentName=meaningful_name,
documentData=result,
mimeType=output_mime_type
))
# DEBUG dump: write parsed documents to files in the same debug folder
try:
# Reuse the same debug_dir if created above; otherwise create a new one
import os
from datetime import datetime
debug_root = "../local/testing_extraction"
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
debug_dir = os.path.join(debug_root, f"method_ai_{ts}")
os.makedirs(debug_dir, exist_ok=True)
# Write a summary and individual documents
summary_lines: List[str] = [f"documents: {len(action_documents)}"]
for i, doc in enumerate(action_documents, 1):
summary_lines.append(f"doc[{i}]: name={doc.documentName}, mimeType={doc.mimeType}")
safe_name = doc.documentName or f"doc_{i:03d}.txt"
fpath = os.path.join(debug_dir, safe_name)
with open(fpath, "w", encoding="utf-8") as f:
f.write(str(doc.documentData) if doc.documentData is not None else "")
with open(os.path.join(debug_dir, "summary.txt"), "w", encoding="utf-8") as f:
f.write("\n".join(summary_lines))
except Exception:
pass
# Return result in the standard ActionResult format with parsed documents
return ActionResult.isSuccess(
documents=action_documents
)
except Exception as e:
logger.error(f"Error in AI processing: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)
@action
async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Perform comprehensive web research using the full workflow.
Parameters:
user_prompt (str): The user input or question to investigate
urls (list, optional): Specific URLs to crawl instead of searching
max_results (int, optional): Maximum search results (default: 5)
max_pages (int, optional): Maximum pages to crawl (default: 10)
search_depth (str, optional): Tavily search depth - 'basic' or 'advanced' (default: 'basic')
extract_depth (str, optional): Tavily extract depth - 'basic' or 'advanced' (default: 'advanced')
pages_search_depth (int, optional): How deep to crawl - 1=main pages only, 2=main+sub-pages, 3=main+sub+sub-sub, etc. (default: 2)
country (str, optional): Country code for search bias
time_range (str, optional): Time range for search - 'd', 'w', 'm', 'y'
topic (str, optional): Search topic - 'general', 'news', 'academic'
language (str, optional): Language code
"""
try:
user_prompt = parameters.get("user_prompt")
urls = parameters.get("urls")
max_results = parameters.get("max_results", 5)
max_pages = parameters.get("max_pages", 10)
search_depth = parameters.get("search_depth", "basic")
extract_depth = parameters.get("extract_depth", "advanced")
pages_search_depth = parameters.get("pages_search_depth", 2)
country = parameters.get("country")
time_range = parameters.get("time_range")
topic = parameters.get("topic")
language = parameters.get("language")
if not user_prompt:
return ActionResult.isFailure(
error="Search query is required"
)
# Build WebResearchOptions
options = WebResearchOptions(
max_pages=max_pages,
search_depth=search_depth,
extract_depth=extract_depth,
pages_search_depth=pages_search_depth,
country=country,
time_range=time_range,
topic=topic,
language=language
)
# Build WebResearchRequest
request = WebResearchRequest(
user_prompt=user_prompt,
urls=urls,
max_results=max_results,
options=options
)
# Call web research service
logger.info(f"Performing comprehensive web research for: {user_prompt}")
logger.info(f"Max results: {max_results}, Max pages: {max_pages}")
if urls:
logger.info(f"Using provided URLs: {len(urls)}")
result = await self.services.ai.webResearch(request)
if not result.success:
return ActionResult.isFailure(error=result.error)
# Convert WebResearchActionResult to ActionResult format
documents = []
for doc in result.documents:
documents.append({
"documentName": doc.documentName,
"documentData": {
"user_prompt": doc.documentData.user_prompt,
"websites_analyzed": doc.documentData.websites_analyzed,
"additional_links_found": doc.documentData.additional_links_found,
"analysis_result": doc.documentData.analysis_result,
"sources": [{"title": s.title, "url": str(s.url)} for s in doc.documentData.sources],
"additional_links": doc.documentData.additional_links,
"debug_info": doc.documentData.debug_info
},
"mimeType": doc.mimeType
})
# Return result in the standard ActionResult format
return ActionResult.isSuccess(
documents=documents
)
except Exception as e:
logger.error(f"Error in web research: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)