""" AI processing method module. Handles direct AI calls for any type of task. """ import time import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC from modules.workflows.methods.methodBase import MethodBase, action from modules.datamodels.datamodelChat import ActionResult from modules.datamodels.datamodelAi import AiCallOptions, OperationTypeEnum from modules.datamodels.datamodelChat import ChatDocument from modules.aicore.aicorePluginTavily import WebResearchRequest logger = logging.getLogger(__name__) class MethodAi(MethodBase): """AI processing methods.""" def __init__(self, services): super().__init__(services) self.name = "ai" self.description = "AI processing methods" def _format_timestamp_for_filename(self) -> str: """Format current timestamp as YYYYMMDD-hhmmss for filenames.""" return datetime.now(UTC).strftime("%Y%m%d-%H%M%S") @action async def process(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Process a user prompt with optional unlimited input documents to produce one or many output documents of the SAME format. - Input requirements: aiPrompt (required); optional documentList. - Output format: Exactly one file format to select. For multiple output file formats you need to do different calls. Parameters: - aiPrompt (str, required): Instruction for the AI. - documentList (list, optional): Document reference(s) for context. - resultType (str, optional): Output file extension - only one extension allowed (e.g. txt, json, md, csv, xml, html, pdf, docx, xlsx, png, ...). Default: txt. """ try: # Init progress logger operationId = f"ai_process_{self.services.currentWorkflow.id}_{int(time.time())}" # Start progress tracking self.services.workflow.progressLogStart( operationId, "Generate", "AI Processing", f"Format: {parameters.get('resultType', 'txt')}" ) # Debug logging to see what parameters are received logger.info(f"MethodAi.process received parameters: {parameters}") logger.info(f"Parameters type: {type(parameters)}") logger.info(f"Parameters keys: {list(parameters.keys()) if isinstance(parameters, dict) else 'Not a dict'}") aiPrompt = parameters.get("aiPrompt") logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})") # Update progress - preparing parameters self.services.workflow.progressLogUpdate(operationId, 0.2, "Preparing parameters") documentList = parameters.get("documentList", []) if isinstance(documentList, str): documentList = [documentList] resultType = parameters.get("resultType", "txt") if not aiPrompt: logger.error(f"aiPrompt is missing or empty. Parameters: {parameters}") return ActionResult.isFailure( error="AI prompt is required" ) # Determine output extension and default MIME type without duplicating service logic normalized_result_type = (str(resultType).strip().lstrip('.').lower() or "txt") output_extension = f".{normalized_result_type}" output_mime_type = "application/octet-stream" # Prefer service-provided mimeType when available logger.info(f"Using result type: {resultType} -> {output_extension}") # Update progress - preparing documents self.services.workflow.progressLogUpdate(operationId, 0.3, "Preparing documents") # Get ChatDocuments for AI service - let AI service handle all document processing chatDocuments = [] if documentList: chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) if chatDocuments: logger.info(f"Prepared {len(chatDocuments)} documents for AI processing") # Update progress - preparing AI call self.services.workflow.progressLogUpdate(operationId, 0.4, "Preparing AI call") # Build options with only resultFormat - let service layer handle all other parameters output_format = output_extension.replace('.', '') or 'txt' options = AiCallOptions( resultFormat=output_format # Removed all model parameters - service layer will analyze prompt and determine optimal parameters ) # Update progress - calling AI self.services.workflow.progressLogUpdate(operationId, 0.6, "Calling AI") result = await self.services.ai.callAiDocuments( prompt=aiPrompt, documents=chatDocuments if chatDocuments else None, options=options, outputFormat=output_format ) # Update progress - processing result self.services.workflow.progressLogUpdate(operationId, 0.8, "Processing result") from modules.datamodels.datamodelChat import ActionDocument if isinstance(result, dict) and isinstance(result.get("documents"), list): action_documents = [] for d in result["documents"]: action_documents.append(ActionDocument( documentName=d.get("documentName"), documentData=d.get("documentData"), mimeType=d.get("mimeType") or output_mime_type )) # Complete progress tracking self.services.workflow.progressLogFinish(operationId, True) return ActionResult.isSuccess(documents=action_documents) extension = output_extension.lstrip('.') meaningful_name = self._generateMeaningfulFileName( base_name="ai", extension=extension, action_name="result" ) action_document = ActionDocument( documentName=meaningful_name, documentData=result, mimeType=output_mime_type ) # Complete progress tracking self.services.workflow.progressLogFinish(operationId, True) return ActionResult.isSuccess(documents=[action_document]) except Exception as e: logger.error(f"Error in AI processing: {str(e)}") # Complete progress tracking with failure try: self.services.workflow.progressLogFinish(operationId, False) except: pass # Don't fail on progress logging errors return ActionResult.isFailure( error=str(e) ) @action async def webSearch(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Search the web and return a list of relevant URLs only. - Input requirements: searchPrompt (required); optional maxResults, timeRange, country, language. - Output format: JSON with search results and URLs. Parameters: - searchPrompt (str, required): Natural language search prompt describing what to search for. - maxResults (int, optional): Maximum number of search results. Default: 5. - timeRange (str, optional): d | w | m | y for time filtering. - country (str, optional): Country name for localized results. - language (str, optional): Language code (e.g., de, en, fr). """ try: searchPrompt = parameters.get("searchPrompt") if not searchPrompt: return ActionResult.isFailure(error="Search prompt is required") # Extract optional parameters maxResults = parameters.get("maxResults", 5) timeRange = parameters.get("timeRange") country = parameters.get("country") language = parameters.get("language") # Build AI call options for web search options = AiCallOptions( operationType=OperationTypeEnum.WEB_SEARCH, resultFormat="json" ) # Create unified prompt JSON that both Tavily and Perplexity can understand promptData = { "searchPrompt": searchPrompt, "maxResults": maxResults, "timeRange": timeRange, "country": country, "language": language, "instructions": "Search the web and return a JSON response with a 'results' array containing objects with 'title', 'url', and optionally 'content' fields. Focus on finding relevant URLs for the search prompt." } import json prompt = json.dumps(promptData, indent=2) # Call AI service through unified path result = await self.services.ai.callAiDocuments( prompt=prompt, documents=None, options=options, outputFormat="json" ) # Process result to ensure consistent format processedResult = self._processWebSearchResult(result) # Create meaningful filename meaningfulName = self._generateMeaningfulFileName( base_name="web_search", extension="json", action_name="search" ) from modules.datamodels.datamodelChat import ActionDocument actionDocument = ActionDocument( documentName=meaningfulName, documentData=processedResult, mimeType="application/json" ) return ActionResult.isSuccess(documents=[actionDocument]) except Exception as e: logger.error(f"Error in web search: {str(e)}") return ActionResult.isFailure(error=str(e)) def _processWebSearchResult(self, result: str) -> str: """ Process web search result to ensure consistent JSON format with URL list. Both Tavily and Perplexity now return proper JSON format. """ try: import json data = json.loads(result) # If it's already a proper search result format, return as-is if isinstance(data, dict) and "results" in data: return result # If it's a different JSON format, try to extract URLs if isinstance(data, dict): # Look for URL patterns in the JSON urls = self._extractUrlsFromJson(data) if urls: processedData = { "query": data.get("query", "web search"), "results": [{"title": f"Result {i+1}", "url": url} for i, url in enumerate(urls)], "total_count": len(urls) } return json.dumps(processedData, indent=2) # No URLs found, return original result in a structured format processedData = { "query": "web search", "results": [], "total_count": 0, "raw_response": result } return json.dumps(processedData, indent=2) except Exception as e: logger.warning(f"Error processing web search result: {str(e)}") # Return original result wrapped in error format errorData = { "query": "web search", "results": [], "total_count": 0, "error": f"Failed to process result: {str(e)}", "raw_response": result } return json.dumps(errorData, indent=2) def _extractUrlsFromJson(self, data: Dict[str, Any]) -> List[str]: """Extract URLs from JSON data structure.""" urls = [] def _extractFromValue(value): if isinstance(value, str): # Check if it's a URL if value.startswith(('http://', 'https://')): urls.append(value) elif isinstance(value, dict): for v in value.values(): _extractFromValue(v) elif isinstance(value, list): for item in value: _extractFromValue(item) _extractFromValue(data) return list(set(urls)) # Remove duplicates @action async def webCrawl(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Extract content from specific URLs. - Input requirements: urls (required); optional extractDepth, format. - Output format: JSON with extracted content from URLs. Parameters: - urls (list, required): List of URLs to crawl and extract content from. - extractDepth (str, optional): basic | advanced. Default: advanced. - format (str, optional): markdown | html | text. Default: markdown. """ try: urls = parameters.get("urls") if not urls or not isinstance(urls, list): return ActionResult.isFailure(error="URLs list is required") # Extract optional parameters extractDepth = parameters.get("extractDepth", "advanced") formatType = parameters.get("format", "markdown") # Build AI call options for web crawling options = AiCallOptions( operationType=OperationTypeEnum.WEB_CRAWL, resultFormat="json" ) # Create unified prompt JSON for web crawling promptData = { "urls": urls, "extractDepth": extractDepth, "format": formatType, "instructions": "Extract content from the provided URLs and return a JSON response with 'results' array containing objects with 'url', 'title', 'content', and 'extractedAt' fields." } import json prompt = json.dumps(promptData, indent=2) # Call AI service through unified path result = await self.services.ai.callAiDocuments( prompt=prompt, documents=None, options=options, outputFormat="json" ) # Create meaningful filename meaningfulName = self._generateMeaningfulFileName( base_name="web_crawl", extension="json", action_name="crawl" ) from modules.datamodels.datamodelChat import ActionDocument actionDocument = ActionDocument( documentName=meaningfulName, documentData=result, mimeType="application/json" ) return ActionResult.isSuccess(documents=[actionDocument]) except Exception as e: logger.error(f"Error in web crawl: {str(e)}") return ActionResult.isFailure(error=str(e)) @action async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Comprehensive web research combining search and content extraction. - Input requirements: researchPrompt (required); optional maxResults, urls, timeRange, country, language. - Output format: JSON with research results, sources, and analysis. Parameters: - researchPrompt (str, required): Natural language research prompt describing what to research. - maxResults (int, optional): Maximum search results. Default: 5. - urls (list, optional): Specific URLs to include in research. - timeRange (str, optional): d | w | m | y for time filtering. - country (str, optional): Country name for localized results. - language (str, optional): Language code (e.g., de, en, fr). """ try: researchPrompt = parameters.get("researchPrompt") if not researchPrompt: return ActionResult.isFailure(error="Research prompt is required") # Extract optional parameters maxResults = parameters.get("maxResults", 5) urls = parameters.get("urls") timeRange = parameters.get("timeRange") country = parameters.get("country") language = parameters.get("language") # Build AI call options for web research options = AiCallOptions( operationType=OperationTypeEnum.WEB_RESEARCH, resultFormat="json" ) # Create unified prompt JSON for web research promptData = { "researchPrompt": researchPrompt, "maxResults": maxResults, "urls": urls, "timeRange": timeRange, "country": country, "language": language, "instructions": "Conduct comprehensive web research and return a JSON response with 'results' array containing objects with 'title', 'url', 'content', and 'analysis' fields. Provide detailed analysis and insights." } import json prompt = json.dumps(promptData, indent=2) # Call AI service through unified path result = await self.services.ai.callAiDocuments( prompt=prompt, documents=None, options=options, outputFormat="json" ) # Create meaningful filename meaningfulName = self._generateMeaningfulFileName( base_name="web_research", extension="json", action_name="research" ) from modules.datamodels.datamodelChat import ActionDocument actionDocument = ActionDocument( documentName=meaningfulName, documentData=result, mimeType="application/json" ) return ActionResult.isSuccess(documents=[actionDocument]) except Exception as e: logger.error(f"Error in web research: {str(e)}") return ActionResult.isFailure(error=str(e)) @action async def webQuestions(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Answer questions using web research and AI analysis. - Input requirements: question (required); optional context, maxResults, timeRange, country, language. - Output format: JSON with question answer and supporting sources. Parameters: - question (str, required): Question to be answered using web research. - context (str, optional): Additional context for the question. - maxResults (int, optional): Maximum search results. Default: 5. - timeRange (str, optional): d | w | m | y for time filtering. - country (str, optional): Country name for localized results. - language (str, optional): Language code (e.g., de, en, fr). """ try: question = parameters.get("question") if not question: return ActionResult.isFailure(error="Question is required") # Extract optional parameters context = parameters.get("context", "") maxResults = parameters.get("maxResults", 5) timeRange = parameters.get("timeRange") country = parameters.get("country") language = parameters.get("language") # Build AI call options for web questions options = AiCallOptions( operationType=OperationTypeEnum.WEB_QUESTIONS, resultFormat="json" ) # Create unified prompt JSON for web questions promptData = { "question": question, "context": context, "maxResults": maxResults, "timeRange": timeRange, "country": country, "language": language, "instructions": "Answer the question using web research and return a JSON response with 'answer', 'sources' array containing objects with 'title', 'url', 'content', and 'relevance' fields." } import json prompt = json.dumps(promptData, indent=2) # Call AI service through unified path result = await self.services.ai.callAiDocuments( prompt=prompt, documents=None, options=options, outputFormat="json" ) # Create meaningful filename meaningfulName = self._generateMeaningfulFileName( base_name="web_questions", extension="json", action_name="questions" ) from modules.datamodels.datamodelChat import ActionDocument actionDocument = ActionDocument( documentName=meaningfulName, documentData=result, mimeType="application/json" ) return ActionResult.isSuccess(documents=[actionDocument]) except Exception as e: logger.error(f"Error in web questions: {str(e)}") return ActionResult.isFailure(error=str(e)) @action async def webNews(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Search and analyze news articles on specific topics. - Input requirements: newsPrompt (required); optional maxResults, timeRange, country, language. - Output format: JSON with news articles, summaries, and analysis. Parameters: - newsPrompt (str, required): Natural language prompt describing what news to search for. - maxResults (int, optional): Maximum news articles. Default: 5. - timeRange (str, optional): d | w | m | y for time filtering. Default: w. - country (str, optional): Country name for localized news. - language (str, optional): Language code (e.g., de, en, fr). """ try: newsPrompt = parameters.get("newsPrompt") if not newsPrompt: return ActionResult.isFailure(error="News prompt is required") # Extract optional parameters maxResults = parameters.get("maxResults", 5) timeRange = parameters.get("timeRange", "w") # Default to week country = parameters.get("country") language = parameters.get("language") # Build AI call options for web news options = AiCallOptions( operationType=OperationTypeEnum.WEB_NEWS, resultFormat="json" ) # Create unified prompt JSON for web news promptData = { "newsPrompt": newsPrompt, "maxResults": maxResults, "timeRange": timeRange, "country": country, "language": language, "instructions": "Find and analyze recent news articles and return a JSON response with 'articles' array containing objects with 'title', 'url', 'content', 'date', 'source', and 'summary' fields." } import json prompt = json.dumps(promptData, indent=2) # Call AI service through unified path result = await self.services.ai.callAiDocuments( prompt=prompt, documents=None, options=options, outputFormat="json" ) # Create meaningful filename meaningfulName = self._generateMeaningfulFileName( base_name="web_news", extension="json", action_name="news" ) from modules.datamodels.datamodelChat import ActionDocument actionDocument = ActionDocument( documentName=meaningfulName, documentData=result, mimeType="application/json" ) return ActionResult.isSuccess(documents=[actionDocument]) except Exception as e: logger.error(f"Error in web news: {str(e)}") return ActionResult.isFailure(error=str(e)) @action async def generateImage(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Generate images using AI based on text prompts. - Input requirements: prompt (required); optional size, quality, style. - Output format: Base64 encoded image data. Parameters: - prompt (str, required): Text description of the image to generate. - size (str, optional): Image size. Options: 1024x1024, 1792x1024, 1024x1792. Default: 1024x1024. - quality (str, optional): Image quality. Options: standard, hd. Default: standard. - style (str, optional): Image style. Options: vivid, natural. Default: vivid. """ try: prompt = parameters.get("prompt") if not prompt: return ActionResult.isFailure(error="Image prompt is required") # Extract optional parameters size = parameters.get("size", "1024x1024") quality = parameters.get("quality", "standard") style = parameters.get("style", "vivid") # Build AI call options for image generation options = AiCallOptions( operationType=OperationTypeEnum.IMAGE_GENERATE, resultFormat="base64" ) # Create unified prompt JSON for image generation promptData = { "prompt": prompt, "size": size, "quality": quality, "style": style, "instructions": "Generate an image based on the prompt and return the base64 encoded image data." } import json promptJson = json.dumps(promptData, indent=2) # Call AI service through unified path result = await self.services.ai.callAiDocuments( prompt=promptJson, documents=None, options=options, outputFormat="base64" ) # Create meaningful filename meaningfulName = self._generateMeaningfulFileName( base_name="generated_image", extension="png", action_name="generate" ) from modules.datamodels.datamodelChat import ActionDocument actionDocument = ActionDocument( documentName=meaningfulName, documentData=result, mimeType="image/png" ) return ActionResult.isSuccess(documents=[actionDocument]) except Exception as e: logger.error(f"Error in image generation: {str(e)}") return ActionResult.isFailure(error=str(e))