""" AI processing method module. Handles direct AI calls for any type of task. """ import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC from modules.workflows.methods.methodBase import MethodBase, action from modules.datamodels.datamodelWorkflow import ActionResult from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority from modules.datamodels.datamodelChat import ChatDocument from modules.datamodels.datamodelWeb import WebResearchRequest, WebResearchOptions logger = logging.getLogger(__name__) class MethodAi(MethodBase): """AI processing methods.""" def __init__(self, services): super().__init__(services) self.name = "ai" self.description = "AI processing methods" def _format_timestamp_for_filename(self) -> str: """Format current timestamp as YYYYMMDD-hhmmss for filenames.""" return datetime.now(UTC).strftime("%Y%m%d-%H%M%S") @action async def process(self, parameters: Dict[str, Any]) -> ActionResult: """ Perform an AI call for any type of task with optional document references Parameters: aiPrompt (str): The AI prompt for processing documentList (list, optional): List of document references to include in context expectedDocumentFormat (str, optional): Expected document output format with extension, mimeType, description processingMode (str, optional): Processing mode - use 'basic', 'advanced', or 'detailed' (defaults to 'basic') includeMetadata (bool, optional): Whether to include metadata (default: True) operationType (str, optional): Operation type - use 'general', 'generate_plan', 'analyse_content', 'generate_content', 'web_research', 'image_analysis', or 'image_generation' priority (str, optional): Priority level - use 'speed', 'quality', 'cost', or 'balanced' maxCost (float, optional): Maximum cost budget for the AI call maxProcessingTime (int, optional): Maximum processing time in seconds requiredTags (list, optional): Required model tags - use 'text', 'chat', 'reasoning', 'analysis', 'image', 'vision', 'web', 'search', etc. """ try: aiPrompt = parameters.get("aiPrompt") documentList = parameters.get("documentList", []) if isinstance(documentList, str): documentList = [documentList] expectedDocumentFormat = parameters.get("expectedDocumentFormat", "") processingMode = parameters.get("processingMode", "basic") includeMetadata = parameters.get("includeMetadata", True) operationType = parameters.get("operationType", "general") priority = parameters.get("priority", "balanced") maxCost = parameters.get("maxCost") maxProcessingTime = parameters.get("maxProcessingTime") requiredTags = parameters.get("requiredTags") if not aiPrompt: return ActionResult.isFailure( error="AI prompt is required" ) # Determine output format first (needed for context building) output_extension = ".txt" # Default output_mime_type = "text/plain" # Default if expectedDocumentFormat: output_extension = expectedDocumentFormat.get("extension", ".txt") output_mime_type = expectedDocumentFormat.get("mimeType", "text/plain") logger.info(f"Using expected format: {output_extension} ({output_mime_type})") # Get ChatDocuments for AI service - let AI service handle all document processing chatDocuments = [] if documentList: chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) if chatDocuments: logger.info(f"Prepared {len(chatDocuments)} documents for AI processing") # Build enhanced prompt enhanced_prompt = aiPrompt # Add processing mode instructions if specified (generic, not analysis-specific) if processingMode == "detailed": enhanced_prompt += "\n\nPlease provide a detailed response with comprehensive information." elif processingMode == "advanced": enhanced_prompt += "\n\nPlease provide an advanced response with deep insights." # Note: customInstructions parameter was removed as it's not defined in the method signature # Add format guidance to prompt if expectedDocumentFormat: enhanced_prompt += f"\n\nPlease try to deliver the result in {output_extension.upper()} format. If you cannot deliver in that specific format, please use an appropriate alternative format and include a comment explaining the format used." # Call AI service - it will handle all document processing internally logger.info(f"Executing AI call with mode: {processingMode}, prompt length: {len(enhanced_prompt)}") if chatDocuments: logger.info(f"Including {len(chatDocuments)} documents for AI processing") # Add JSON format instruction for structured response json_instruction = """ Please return your response in the following JSON format: {{ "documents": [ {{ "data": "your actual content here", "mimeType": "appropriate/mime-type", "comment": "optional comment about format or content" }} ] }} If you need to return multiple documents, add more objects to the documents array. The data field should contain the actual content, mimeType should be appropriate for the content format, and comment is optional. """ call_prompt = enhanced_prompt + json_instruction output_format = output_extension.replace('.', '') or 'txt' # Build options using new AiCallOptions format options = AiCallOptions( operationType=operationType, priority=priority, compressPrompt=processingMode != "detailed", compressContext=True, processDocumentsIndividually=True, processingMode=processingMode, resultFormat=output_format, maxCost=maxCost, maxProcessingTime=maxProcessingTime, requiredTags=requiredTags ) # Use the new AI service that handles document processing internally result = await self.services.ai.callAi( prompt=call_prompt, documents=chatDocuments if chatDocuments else None, options=options ) # Parse JSON response from AI import json documents = [] try: # Clean up the response (remove markdown code blocks if present) cleaned_result = (result or "").strip() if cleaned_result.startswith('```json'): cleaned_result = cleaned_result[7:] if cleaned_result.endswith('```'): cleaned_result = cleaned_result[:-3] cleaned_result = cleaned_result.strip() # Parse JSON response parsed_response = json.loads(cleaned_result) # Extract documents from response if isinstance(parsed_response, dict) and "documents" in parsed_response: for doc in parsed_response["documents"]: if isinstance(doc, dict): documents.append({ "documentName": f"ai_result_{len(documents) + 1}{output_extension}", "documentData": { "result": doc.get("data", ""), "fileName": f"ai_result_{len(documents) + 1}{output_extension}", "processedDocuments": len(chatDocuments) if chatDocuments else 0, "comment": doc.get("comment", "") }, "mimeType": doc.get("mimeType", output_mime_type) }) # If no documents found in JSON, create a single document from the raw result if not documents: documents.append({ "documentName": f"ai_result_1{output_extension}", "documentData": { "result": result, "fileName": f"ai_result_1{output_extension}", "processedDocuments": len(chatDocuments) if chatDocuments else 0, "comment": "Raw AI response (JSON parsing failed)" }, "mimeType": output_mime_type }) except Exception as e: # Fallback: create single document with raw result logger.warning(f"Failed to parse AI response as JSON: {str(e)}") documents.append({ "documentName": f"ai_result_1{output_extension}", "documentData": { "result": result, "fileName": f"ai_result_1{output_extension}", "processedDocuments": len(chatDocuments) if chatDocuments else 0, "comment": f"Raw AI response (JSON parsing failed: {str(e)})" }, "mimeType": output_mime_type }) # Return result in the standard ActionResult format with parsed documents return ActionResult.isSuccess( documents=documents ) except Exception as e: logger.error(f"Error in AI processing: {str(e)}") return ActionResult.isFailure( error=str(e) ) @action async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult: """ Perform comprehensive web research using the full workflow. Parameters: search_query (str): The research query or question to investigate urls (list, optional): Specific URLs to crawl instead of searching max_results (int, optional): Maximum search results (default: 5) max_pages (int, optional): Maximum pages to crawl (default: 10) search_depth (str, optional): Tavily search depth - 'basic' or 'advanced' (default: 'basic') extract_depth (str, optional): Tavily extract depth - 'basic' or 'advanced' (default: 'advanced') format (str, optional): Content format - 'text' or 'markdown' (default: 'markdown') return_report (bool, optional): Return formatted report or raw data (default: True) country (str, optional): Country code for search bias time_range (str, optional): Time range for search - 'd', 'w', 'm', 'y' topic (str, optional): Search topic - 'general', 'news', 'academic' language (str, optional): Language code """ try: search_query = parameters.get("search_query") urls = parameters.get("urls") max_results = parameters.get("max_results", 5) max_pages = parameters.get("max_pages", 10) search_depth = parameters.get("search_depth", "basic") extract_depth = parameters.get("extract_depth", "advanced") format = parameters.get("format", "markdown") return_report = parameters.get("return_report", True) country = parameters.get("country") time_range = parameters.get("time_range") topic = parameters.get("topic") language = parameters.get("language") if not search_query: return ActionResult.isFailure( error="Search query is required" ) # Build WebResearchOptions options = WebResearchOptions( max_pages=max_pages, search_depth=search_depth, extract_depth=extract_depth, format=format, return_report=return_report, country=country, time_range=time_range, topic=topic, language=language ) # Build WebResearchRequest request = WebResearchRequest( search_query=search_query, urls=urls, max_results=max_results, options=options ) # Call web research service logger.info(f"Performing comprehensive web research for: {search_query}") logger.info(f"Max results: {max_results}, Max pages: {max_pages}") if urls: logger.info(f"Using provided URLs: {len(urls)}") result = await self.services.ai.webResearch(request) if not result.success: return ActionResult.isFailure(error=result.error) # Convert WebResearchActionResult to ActionResult format documents = [] for doc in result.documents: documents.append({ "documentName": doc.documentName, "documentData": { "search_query": doc.documentData.search_query, "websites_analyzed": doc.documentData.websites_analyzed, "additional_links_found": doc.documentData.additional_links_found, "analysis_result": doc.documentData.analysis_result, "sources": [{"title": s.title, "url": str(s.url)} for s in doc.documentData.sources], "additional_links": doc.documentData.additional_links, "debug_info": doc.documentData.debug_info }, "mimeType": doc.mimeType }) # Return result in the standard ActionResult format return ActionResult.isSuccess( documents=documents ) except Exception as e: logger.error(f"Error in web research: {str(e)}") return ActionResult.isFailure( error=str(e) )