""" AI processing method module. Handles direct AI calls for any type of task. """ import logging from typing import Dict, Any, List, Optional from datetime import datetime, UTC from modules.workflows.methods.methodBase import MethodBase, action from modules.datamodels.datamodelChat import ActionResult from modules.datamodels.datamodelAi import AiCallOptions, OperationType, Priority from modules.datamodels.datamodelChat import ChatDocument from modules.datamodels.datamodelWeb import WebResearchRequest, WebResearchOptions logger = logging.getLogger(__name__) class MethodAi(MethodBase): """AI processing methods.""" def __init__(self, services): super().__init__(services) self.name = "ai" self.description = "AI processing methods" def _format_timestamp_for_filename(self) -> str: """Format current timestamp as YYYYMMDD-hhmmss for filenames.""" return datetime.now(UTC).strftime("%Y%m%d-%H%M%S") @action async def process(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: AI-based analysis and content generation with optional document context. - Input requirements: aiPrompt (required); optional documentList, resultType, processingMode, includeMetadata, operationType, priority, maxCost, maxProcessingTime, requiredTags. - Output format: Single or multiple documents in requested format. Parameters: - aiPrompt (str, required): Instruction for the AI. - documentList (list, optional): Document reference(s) for context. - resultType (str, optional): Output extension (txt, json, md, csv, xml, html, pdf, docx, xlsx, png). Default: txt. - processingMode (str, optional): basic | advanced | detailed. Default: basic. - includeMetadata (bool, optional): Include metadata when available. Default: True. - operationType (str, optional): general | generate_plan | analyse_content | generate_content | web_research | image_analysis | image_generation. Default: general. - priority (str, optional): speed | quality | cost | balanced. Default: balanced. - maxCost (float, optional): Cost limit. - maxProcessingTime (int, optional): Time limit in seconds. - requiredTags (list, optional): Capability tags (e.g., text, chat, reasoning, analysis, image, vision, web, search). """ try: # Debug logging to see what parameters are received logger.info(f"MethodAi.process received parameters: {parameters}") logger.info(f"Parameters type: {type(parameters)}") logger.info(f"Parameters keys: {list(parameters.keys()) if isinstance(parameters, dict) else 'Not a dict'}") aiPrompt = parameters.get("aiPrompt") logger.info(f"aiPrompt extracted: '{aiPrompt}' (type: {type(aiPrompt)})") documentList = parameters.get("documentList", []) if isinstance(documentList, str): documentList = [documentList] resultType = parameters.get("resultType", "txt") processingMode = parameters.get("processingMode", "basic") includeMetadata = parameters.get("includeMetadata", True) operationType = parameters.get("operationType", "general") priority = parameters.get("priority", "balanced") maxCost = parameters.get("maxCost") maxProcessingTime = parameters.get("maxProcessingTime") requiredTags = parameters.get("requiredTags") if not aiPrompt: logger.error(f"aiPrompt is missing or empty. Parameters: {parameters}") return ActionResult.isFailure( error="AI prompt is required" ) # Determine output extension and default MIME type without duplicating service logic normalized_result_type = (str(resultType).strip().lstrip('.').lower() or "txt") output_extension = f".{normalized_result_type}" output_mime_type = "application/octet-stream" # Prefer service-provided mimeType when available logger.info(f"Using result type: {resultType} -> {output_extension}") # Get ChatDocuments for AI service - let AI service handle all document processing chatDocuments = [] if documentList: chatDocuments = self.services.workflow.getChatDocumentsFromDocumentList(documentList) if chatDocuments: logger.info(f"Prepared {len(chatDocuments)} documents for AI processing") # Build enhanced prompt enhanced_prompt = aiPrompt # Add processing mode instructions if specified (generic, not analysis-specific) if processingMode == "detailed": enhanced_prompt += "\n\nPlease provide a detailed response with comprehensive information." elif processingMode == "advanced": enhanced_prompt += "\n\nPlease provide an advanced response with deep insights." # Note: customInstructions parameter was removed as it's not defined in the method signature # Add format guidance to prompt if normalized_result_type != "txt": enhanced_prompt += f"\n\nPlease deliver the result in {normalized_result_type.upper()} format. Ensure the output follows the proper {normalized_result_type.upper()} syntax and structure." # Build options and delegate document handling to AI/Extraction/Generation services output_format = output_extension.replace('.', '') or 'txt' options = AiCallOptions( operationType=operationType, priority=priority, compressPrompt=processingMode != "detailed", compressContext=True, processDocumentsIndividually=True, processingMode=processingMode, resultFormat=output_format, maxCost=maxCost, maxProcessingTime=maxProcessingTime, requiredTags=requiredTags ) supported_generation_formats = {"html", "pdf", "docx", "txt", "md", "json", "csv", "xlsx"} output_format_arg = output_format if output_format in supported_generation_formats else None result = await self.services.ai.callAi( prompt=enhanced_prompt, documents=chatDocuments if chatDocuments else None, options=options, outputFormat=output_format_arg ) from modules.datamodels.datamodelChat import ActionDocument if isinstance(result, dict) and isinstance(result.get("documents"), list): action_documents = [] for d in result["documents"]: action_documents.append(ActionDocument( documentName=d.get("documentName"), documentData=d.get("documentData"), mimeType=d.get("mimeType") or output_mime_type )) return ActionResult.isSuccess(documents=action_documents) extension = output_extension.lstrip('.') meaningful_name = self._generateMeaningfulFileName( base_name="ai", extension=extension, action_name="result" ) action_document = ActionDocument( documentName=meaningful_name, documentData=result, mimeType=output_mime_type ) return ActionResult.isSuccess(documents=[action_document]) except Exception as e: logger.error(f"Error in AI processing: {str(e)}") return ActionResult.isFailure( error=str(e) ) @action async def webResearch(self, parameters: Dict[str, Any]) -> ActionResult: """ GENERAL: - Purpose: Web research and information gathering with basic analysis and sources. - Input requirements: user_prompt (required); optional urls, max_results, max_pages, search_depth, extract_depth, pages_search_depth, country, time_range, topic, language. - Output format: JSON with results and sources. Parameters: - user_prompt (str, required): Research question or topic. - urls (list, optional): Specific URLs to crawl. - max_results (int, optional): Max search results. Default: 5. - max_pages (int, optional): Max pages to crawl per site. Default: 5. - search_depth (str, optional): basic | advanced. Default: basic. - extract_depth (str, optional): basic | advanced. Default: advanced. - pages_search_depth (int, optional): Crawl depth level. Default: 2. - country (str, optional): Full English country name (ISO-3166; map codes via pycountry/i18n-iso-countries). - time_range (str, optional): d | w | m | y. - topic (str, optional): general | news | academic. - language (str, optional): Language code (e.g., de, en, fr). """ try: user_prompt = parameters.get("user_prompt") urls = parameters.get("urls") max_results = parameters.get("max_results", 5) max_pages = parameters.get("max_pages", 5) search_depth = parameters.get("search_depth", "basic") extract_depth = parameters.get("extract_depth", "advanced") pages_search_depth = parameters.get("pages_search_depth", 2) country = parameters.get("country") time_range = parameters.get("time_range") topic = parameters.get("topic") language = parameters.get("language") if not user_prompt: return ActionResult.isFailure( error="Search query is required" ) # Build WebResearchOptions options = WebResearchOptions( max_pages=max_pages, search_depth=search_depth, extract_depth=extract_depth, pages_search_depth=pages_search_depth, country=country, time_range=time_range, topic=topic, language=language ) # Build WebResearchRequest request = WebResearchRequest( user_prompt=user_prompt, urls=urls, max_results=max_results, options=options ) # Call web research service logger.info(f"Performing comprehensive web research for: {user_prompt}") logger.info(f"Max results: {max_results}, Max pages: {max_pages}") if urls: logger.info(f"Using provided URLs: {len(urls)}") result = await self.services.ai.webResearch(request) if not result.success: return ActionResult.isFailure(error=result.error) # Convert WebResearchActionResult to ActionResult format documents = [] for doc in result.documents: documents.append({ "documentName": doc.documentName, "documentData": { "user_prompt": doc.documentData.user_prompt, "websites_analyzed": doc.documentData.websites_analyzed, "additional_links_found": doc.documentData.additional_links_found, "analysis_result": doc.documentData.analysis_result, "sources": [{"title": s.title, "url": str(s.url)} for s in doc.documentData.sources], "additional_links": doc.documentData.additional_links, "debug_info": doc.documentData.debug_info }, "mimeType": doc.mimeType }) # Return result in the standard ActionResult format return ActionResult.isSuccess( documents=documents ) except Exception as e: logger.error(f"Error in web research: {str(e)}") return ActionResult.isFailure( error=str(e) ) def _mergeDataChunks(self, chunks: List[str], resultType: str, mimeType: str) -> str: """Intelligently merge data chunks using strategies based on content type""" try: if resultType == "json": return self._mergeJsonChunks(chunks) elif resultType in ["csv", "table"]: return self._mergeTableChunks(chunks) elif resultType in ["txt", "md", "text"]: return self._mergeTextChunks(chunks) else: # Default: simple concatenation return "\n".join(str(chunk) for chunk in chunks) except Exception as e: logger.warning(f"Failed to merge chunks intelligently: {str(e)}, using simple concatenation") return "\n".join(str(chunk) for chunk in chunks) def _mergeJsonChunks(self, chunks: List[str]) -> str: """Merge JSON chunks intelligently""" import json merged_data = [] for i, chunk in enumerate(chunks): try: if isinstance(chunk, str): chunk_data = json.loads(chunk) else: chunk_data = chunk if isinstance(chunk_data, list): merged_data.extend(chunk_data) elif isinstance(chunk_data, dict): # For objects, merge by combining keys if not merged_data: merged_data = chunk_data else: if isinstance(merged_data, dict): merged_data.update(chunk_data) else: merged_data.append(chunk_data) else: merged_data.append(chunk_data) except Exception as e: logger.warning(f"Failed to parse chunk {i}: {str(e)}") # Add as string if JSON parsing fails merged_data.append(str(chunk)) return json.dumps(merged_data, indent=2) def _mergeTableChunks(self, chunks: List[str]) -> str: """Merge table chunks (CSV) intelligently""" import csv import io merged_rows = [] headers = None for i, chunk in enumerate(chunks): try: # Parse CSV chunk reader = csv.reader(io.StringIO(str(chunk))) rows = list(reader) if not rows: continue # First chunk: capture headers if i == 0: headers = rows[0] if rows else [] merged_rows.extend(rows) else: # Subsequent chunks: skip header if it matches if rows and rows[0] == headers: merged_rows.extend(rows[1:]) # Skip duplicate header else: merged_rows.extend(rows) except Exception as e: logger.warning(f"Failed to parse table chunk {i}: {str(e)}") # Add as raw text if CSV parsing fails merged_rows.append([f"Raw chunk {i}: {str(chunk)[:100]}..."]) # Convert back to CSV output = io.StringIO() writer = csv.writer(output) writer.writerows(merged_rows) return output.getvalue() def _mergeTextChunks(self, chunks: List[str]) -> str: """Merge text chunks intelligently""" # Simple concatenation with proper spacing merged = [] for chunk in chunks: chunk_str = str(chunk).strip() if chunk_str: merged.append(chunk_str) return "\n\n".join(merged) # Double newline between chunks for readability