gateway/modules/workflows/methods/methodAi.py
2025-10-04 18:44:42 +02:00

579 lines
27 KiB
Python

"""
AI processing method module.
Handles direct AI calls for any type of task.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime, UTC
from modules.workflows.methods.methodBase import MethodBase, action
from modules.datamodels.datamodelWorkflow import ActionResult
from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority
from modules.datamodels.datamodelChat import ChatDocument
from modules.datamodels.datamodelWeb import WebResearchRequest, WebResearchOptions
logger = logging.getLogger(__name__)
class MethodAi(MethodBase):
"""AI processing methods."""
def __init__(self, services):
super().__init__(services)
self.name = "ai"
self.description = "AI processing methods"
def _format_timestamp_for_filename(self) -> str:
"""Format current timestamp as YYYYMMDD-hhmmss for filenames."""
return datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
@action
async def process(self, parameters: Dict[str, Any]) -> ActionResult:
"""
AI data delivery and analysis - returns plain text only, NO document generation
USE FOR: Data delivery, analysis, research, Q&A, summarization, translation
DO NOT USE FOR: Code generation, creating formatted documents (Word, PDF, Excel), document generation, file creation
INPUT REQUIREMENTS: Requires aiPrompt parameter (what to deliver)
OUTPUT FORMAT: Plain text only (.txt, .json, .md, .csv, .xml) - NO binary files
DEPENDENCIES: None - can work standalone
WORKFLOW POSITION: Use for data delivery, analysis, research, or text processing tasks
Parameters:
aiPrompt (str): The AI prompt for what we want to have delivered
documentList (list, optional): List of document references to include in context
resultType (str, optional): Output format type - use 'txt', 'json', 'md', 'csv', or 'xml' (defaults to 'txt')
processingMode (str, optional): Processing mode - use 'basic', 'advanced', or 'detailed' (defaults to 'basic')
includeMetadata (bool, optional): Whether to include metadata (default: True)
operationType (str, optional): Operation type - use 'general', 'generate_plan', 'analyse_content', 'generate_content', 'web_research', 'image_analysis', or 'image_generation'
priority (str, optional): Priority level - use 'speed', 'quality', 'cost', or 'balanced'
maxCost (float, optional): Maximum cost budget for the AI call
maxProcessingTime (int, optional): Maximum processing time in seconds
requiredTags (list, optional): Required model tags - use 'text', 'chat', 'reasoning', 'analysis', 'image', 'vision', 'web', 'search', etc.
"""
try:
# Debug logging to see what parameters are received
logger.info(f"MethodAi.process received parameters: {parameters}")
logger.info(f"Parameters type: {type(parameters)}")
logger.info(f"Parameters keys: {list(parameters.keys()) if isinstance(parameters, dict) else 'Not a dict'}")
aiPrompt = parameters.get("aiPrompt")
logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})")
documentList = parameters.get("documentList", [])
if isinstance(documentList, str):
documentList = [documentList]
resultType = parameters.get("resultType", "txt")
processingMode = parameters.get("processingMode", "basic")
includeMetadata = parameters.get("includeMetadata", True)
operationType = parameters.get("operationType", "general")
priority = parameters.get("priority", "balanced")
maxCost = parameters.get("maxCost")
maxProcessingTime = parameters.get("maxProcessingTime")
requiredTags = parameters.get("requiredTags")
if not aiPrompt:
logger.error(f"aiPrompt is missing or empty. Parameters: {parameters}")
return ActionResult.isFailure(
error="AI prompt is required"
)
# Validate and determine output format
valid_result_types = ["txt", "json", "md", "csv", "xml"]
if resultType not in valid_result_types:
return ActionResult.isFailure(
error=f"Invalid resultType '{resultType}'. Must be one of: {', '.join(valid_result_types)}"
)
# Map resultType to file extension and MIME type
format_mapping = {
"txt": (".txt", "text/plain"),
"json": (".json", "application/json"),
"md": (".md", "text/markdown"),
"csv": (".csv", "text/csv"),
"xml": (".xml", "application/xml")
}
output_extension, output_mime_type = format_mapping[resultType]
logger.info(f"Using result type: {resultType} -> {output_extension} ({output_mime_type})")
# Get ChatDocuments for AI service - let AI service handle all document processing
chatDocuments = []
if documentList:
chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList)
if chatDocuments:
logger.info(f"Prepared {len(chatDocuments)} documents for AI processing")
# Build enhanced prompt
enhanced_prompt = aiPrompt
# Add processing mode instructions if specified (generic, not analysis-specific)
if processingMode == "detailed":
enhanced_prompt += "\n\nPlease provide a detailed response with comprehensive information."
elif processingMode == "advanced":
enhanced_prompt += "\n\nPlease provide an advanced response with deep insights."
# Note: customInstructions parameter was removed as it's not defined in the method signature
# Add format guidance to prompt
if resultType != "txt":
enhanced_prompt += f"\n\nPlease deliver the result in {resultType.upper()} format. Ensure the output follows the proper {resultType.upper()} syntax and structure."
# Call AI service - it will handle all document processing internally
logger.info(f"Executing AI call with mode: {processingMode}, prompt length: {len(enhanced_prompt)}")
if chatDocuments:
logger.info(f"Including {len(chatDocuments)} documents for AI processing")
# Add format-specific instruction for structured response with continuation support
if resultType == "json":
format_instruction = """
Please return your response in the following JSON format:
{{
"documents": [
{{
"data": "your actual content here",
"mimeType": "application/json",
"comment": "optional comment about content"
}}
],
"continue": false
}}
The data field should contain valid JSON content.
For large datasets, set "continue": true to indicate more data is coming, and we'll ask for the next chunk.
"""
else:
format_instruction = f"""
Please return your response in the following JSON format:
{{
"documents": [
{{
"data": "your actual content here in {resultType.upper()} format",
"mimeType": "{output_mime_type}",
"comment": "optional comment about content"
}}
],
"continue": false
}}
The data field should contain the content in {resultType.upper()} format.
For large datasets, set "continue": true to indicate more data is coming, and we'll ask for the next chunk.
"""
call_prompt = enhanced_prompt + format_instruction
output_format = output_extension.replace('.', '') or 'txt'
# Build options using new AiCallOptions format
options = AiCallOptions(
operationType=operationType,
priority=priority,
compressPrompt=processingMode != "detailed",
compressContext=True,
processDocumentsIndividually=True,
processingMode=processingMode,
resultFormat=output_format,
maxCost=maxCost,
maxProcessingTime=maxProcessingTime,
requiredTags=requiredTags
)
# Use the new AI service that handles document processing internally
result = await self.services.ai.callAi(
prompt=call_prompt,
documents=chatDocuments if chatDocuments else None,
options=options
)
# DEBUG dump: write raw AI result to @testing_extraction/ TODO Remove
try:
import os
from datetime import datetime
debug_root = "./test-chat/extraction"
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
debug_dir = os.path.join(debug_root, f"method_ai_{ts}")
os.makedirs(debug_dir, exist_ok=True)
with open(os.path.join(debug_dir, "raw_result.txt"), "w", encoding="utf-8") as f:
f.write(str(result) if result is not None else "")
except Exception:
pass
# Parse JSON response from AI with streaming support
import json
import re
from modules.datamodels.datamodelWorkflow import ActionDocument
action_documents = []
all_data_chunks = [] # Store all data chunks for merging
try:
# Process streaming response
chunk_number = 0
continue_processing = True
current_result = result
while continue_processing:
chunk_number += 1
logger.info(f"Processing AI response chunk {chunk_number}")
# Clean up the response (remove markdown code blocks if present)
cleaned_result = (current_result or "").strip()
# Remove code fences anywhere in the text
cleaned_result = re.sub(r"```json|```", "", cleaned_result).strip()
# Try direct parse first
try:
parsed_response = json.loads(cleaned_result)
except Exception:
# Heuristic extraction: find the largest {...} block
start = cleaned_result.find("{")
end = cleaned_result.rfind("}")
if start != -1 and end != -1 and end > start:
candidate = cleaned_result[start:end+1]
# Remove trailing commas before closing braces/brackets
candidate = re.sub(r",\s*([}\]])", r"\1", candidate)
parsed_response = json.loads(candidate)
else:
# Try extracting a JSON code block via regex as last resort
match = re.search(r"\{[\s\S]*\}", cleaned_result)
if match:
candidate = re.sub(r",\s*([}\]])", r"\1", match.group(0))
parsed_response = json.loads(candidate)
else:
raise
# Check if we should continue
continue_processing = parsed_response.get("continue", False)
# Extract documents from response
if isinstance(parsed_response, dict) and "documents" in parsed_response:
for doc in parsed_response["documents"]:
if isinstance(doc, dict):
all_data_chunks.append(doc.get("data", ""))
# If we need to continue, ask for the next chunk
if continue_processing:
logger.info(f"AI indicated more data coming, requesting chunk {chunk_number + 1}")
# Build context from previous chunks
previous_data_summary = ""
if all_data_chunks:
# Show a summary of what was already provided
total_chars = sum(len(str(chunk)) for chunk in all_data_chunks)
previous_data_summary = f"""
CONTEXT: You have already provided {len(all_data_chunks)} chunks of data ({total_chars} characters total).
The last chunk contained: {str(all_data_chunks[-1])[:200]}{'...' if len(str(all_data_chunks[-1])) > 200 else ''}
Please continue with the next chunk, ensuring no duplication of previous data.
"""
continuation_prompt = f"""
{previous_data_summary}
Please continue with the next chunk of data. Return the same JSON format:
{{
"documents": [
{{
"data": "next chunk of data here",
"mimeType": "{output_mime_type}",
"comment": "chunk {chunk_number + 1}"
}}
],
"continue": false
}}
Set "continue": false when this is the final chunk.
"""
# Make another AI call for the next chunk
current_result = await self.services.ai.callAi(
prompt=continuation_prompt,
options=options
)
if not current_result:
logger.warning("No response for continuation chunk, stopping")
break
# Merge all data chunks into final documents using intelligent merging
if all_data_chunks:
merged_data = self._mergeDataChunks(all_data_chunks, resultType, output_mime_type)
# Create final merged document
extension = output_extension.lstrip('.')
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
action_documents.append(ActionDocument(
documentName=meaningful_name,
documentData=merged_data,
mimeType=output_mime_type
))
else:
# Fallback: create single document from raw result
extension = output_extension.lstrip('.')
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
action_documents.append(ActionDocument(
documentName=meaningful_name,
documentData=result,
mimeType=output_mime_type
))
except Exception as e:
# Fallback: create single document with raw result
logger.warning(f"Failed to parse AI response as JSON: {str(e)}")
extension = output_extension.lstrip('.') # Remove leading dot
meaningful_name = self._generateMeaningfulFileName(
base_name="ai",
extension=extension,
action_name="result"
)
action_documents.append(ActionDocument(
documentName=meaningful_name,
documentData=result,
mimeType=output_mime_type
))
# DEBUG dump: write parsed documents to files in the same debug folder
try:
# Reuse the same debug_dir if created above; otherwise create a new one
import os
from datetime import datetime
debug_root = "./test-chat/extraction"
ts = datetime.now(UTC).strftime("%Y%m%d-%H%M%S")
debug_dir = os.path.join(debug_root, f"method_ai_{ts}")
os.makedirs(debug_dir, exist_ok=True)
# Write a summary and individual documents
summary_lines: List[str] = [f"documents: {len(action_documents)}"]
for i, doc in enumerate(action_documents, 1):
summary_lines.append(f"doc[{i}]: name={doc.documentName}, mimeType={doc.mimeType}")
safe_name = doc.documentName or f"doc_{i:03d}.txt"
fpath = os.path.join(debug_dir, safe_name)
with open(fpath, "w", encoding="utf-8") as f:
f.write(str(doc.documentData) if doc.documentData is not None else "")
with open(os.path.join(debug_dir, "summary.txt"), "w", encoding="utf-8") as f:
f.write("\n".join(summary_lines))
except Exception:
pass
# Return result in the standard ActionResult format with parsed documents
return ActionResult.isSuccess(
documents=action_documents
)
except Exception as e:
logger.error(f"Error in AI processing: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)
@action
async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult:
"""
Comprehensive web research and information gathering from the internet
USE FOR: Finding current information, researching topics, gathering external data, fact-checking, market research
DO NOT USE FOR: Processing local documents, creating formatted reports, email operations
INPUT REQUIREMENTS: Requires user_prompt parameter (the research question or topic to investigate)
OUTPUT FORMAT: JSON with research results, sources, and analysis
DEPENDENCIES: Requires internet connection and web search capabilities
WORKFLOW POSITION: Use when external information is needed, before document processing
Parameters:
user_prompt (str): The research question or topic to investigate - describe what information you want to find
urls (list, optional): Specific URLs to crawl instead of searching
max_results (int, optional): Maximum search results (default: 10)
max_pages (int, optional): Maximum pages to crawl (default: 10)
search_depth (str, optional): Tavily search depth - MUST be 'basic' or 'advanced' (default: 'basic')
extract_depth (str, optional): Tavily extract depth - MUST be 'basic' or 'advanced' (default: 'advanced')
pages_search_depth (int, optional): How deep to crawl - 1=main pages only, 2=main+sub-pages, 3=main+sub+sub-sub, etc. (default: 2)
country (str, optional): Country code for search bias (e.g., 'CH', 'US', 'DE')
time_range (str, optional): Time range for search - Use 'd' (day), 'w' (week), 'm' (month), 'y' (year) if needed, otherwise OMIT this parameter entirely
topic (str, optional): Search topic - Use 'general', 'news', or 'academic' if needed, otherwise OMIT this parameter entirely
language (str, optional): Language code (e.g., 'de', 'en', 'fr')
"""
try:
user_prompt = parameters.get("user_prompt")
urls = parameters.get("urls")
max_results = parameters.get("max_results", 10)
max_pages = parameters.get("max_pages", 10)
search_depth = parameters.get("search_depth", "basic")
extract_depth = parameters.get("extract_depth", "advanced")
pages_search_depth = parameters.get("pages_search_depth", 2)
country = parameters.get("country")
time_range = parameters.get("time_range")
topic = parameters.get("topic")
language = parameters.get("language")
if not user_prompt:
return ActionResult.isFailure(
error="Search query is required"
)
# Build WebResearchOptions
options = WebResearchOptions(
max_pages=max_pages,
search_depth=search_depth,
extract_depth=extract_depth,
pages_search_depth=pages_search_depth,
country=country,
time_range=time_range,
topic=topic,
language=language
)
# Build WebResearchRequest
request = WebResearchRequest(
user_prompt=user_prompt,
urls=urls,
max_results=max_results,
options=options
)
# Call web research service
logger.info(f"Performing comprehensive web research for: {user_prompt}")
logger.info(f"Max results: {max_results}, Max pages: {max_pages}")
if urls:
logger.info(f"Using provided URLs: {len(urls)}")
result = await self.services.ai.webResearch(request)
if not result.success:
return ActionResult.isFailure(error=result.error)
# Convert WebResearchActionResult to ActionResult format
documents = []
for doc in result.documents:
documents.append({
"documentName": doc.documentName,
"documentData": {
"user_prompt": doc.documentData.user_prompt,
"websites_analyzed": doc.documentData.websites_analyzed,
"additional_links_found": doc.documentData.additional_links_found,
"analysis_result": doc.documentData.analysis_result,
"sources": [{"title": s.title, "url": str(s.url)} for s in doc.documentData.sources],
"additional_links": doc.documentData.additional_links,
"debug_info": doc.documentData.debug_info
},
"mimeType": doc.mimeType
})
# Return result in the standard ActionResult format
return ActionResult.isSuccess(
documents=documents
)
except Exception as e:
logger.error(f"Error in web research: {str(e)}")
return ActionResult.isFailure(
error=str(e)
)
def _mergeDataChunks(self, chunks: List[str], resultType: str, mimeType: str) -> str:
"""Intelligently merge data chunks using strategies based on content type"""
try:
if resultType == "json":
return self._mergeJsonChunks(chunks)
elif resultType in ["csv", "table"]:
return self._mergeTableChunks(chunks)
elif resultType in ["txt", "md", "text"]:
return self._mergeTextChunks(chunks)
else:
# Default: simple concatenation
return "\n".join(str(chunk) for chunk in chunks)
except Exception as e:
logger.warning(f"Failed to merge chunks intelligently: {str(e)}, using simple concatenation")
return "\n".join(str(chunk) for chunk in chunks)
def _mergeJsonChunks(self, chunks: List[str]) -> str:
"""Merge JSON chunks intelligently"""
import json
merged_data = []
for i, chunk in enumerate(chunks):
try:
if isinstance(chunk, str):
chunk_data = json.loads(chunk)
else:
chunk_data = chunk
if isinstance(chunk_data, list):
merged_data.extend(chunk_data)
elif isinstance(chunk_data, dict):
# For objects, merge by combining keys
if not merged_data:
merged_data = chunk_data
else:
if isinstance(merged_data, dict):
merged_data.update(chunk_data)
else:
merged_data.append(chunk_data)
else:
merged_data.append(chunk_data)
except Exception as e:
logger.warning(f"Failed to parse chunk {i}: {str(e)}")
# Add as string if JSON parsing fails
merged_data.append(str(chunk))
return json.dumps(merged_data, indent=2)
def _mergeTableChunks(self, chunks: List[str]) -> str:
"""Merge table chunks (CSV) intelligently"""
import csv
import io
merged_rows = []
headers = None
for i, chunk in enumerate(chunks):
try:
# Parse CSV chunk
reader = csv.reader(io.StringIO(str(chunk)))
rows = list(reader)
if not rows:
continue
# First chunk: capture headers
if i == 0:
headers = rows[0] if rows else []
merged_rows.extend(rows)
else:
# Subsequent chunks: skip header if it matches
if rows and rows[0] == headers:
merged_rows.extend(rows[1:]) # Skip duplicate header
else:
merged_rows.extend(rows)
except Exception as e:
logger.warning(f"Failed to parse table chunk {i}: {str(e)}")
# Add as raw text if CSV parsing fails
merged_rows.append([f"Raw chunk {i}: {str(chunk)[:100]}..."])
# Convert back to CSV
output = io.StringIO()
writer = csv.writer(output)
writer.writerows(merged_rows)
return output.getvalue()
def _mergeTextChunks(self, chunks: List[str]) -> str:
"""Merge text chunks intelligently"""
# Simple concatenation with proper spacing
merged = []
for chunk in chunks:
chunk_str = str(chunk).strip()
if chunk_str:
merged.append(chunk_str)
return "\n\n".join(merged) # Double newline between chunks for readability